Skip to content

Commit

Permalink
[chore][connector/routing] Add internal package plogutil for splittin…
Browse files Browse the repository at this point in the history
…g logs
  • Loading branch information
djaglowski committed Oct 23, 2024
1 parent 34c776f commit 8a43d36
Show file tree
Hide file tree
Showing 56 changed files with 4,378 additions and 0 deletions.
4 changes: 4 additions & 0 deletions connector/routingconnector/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/connector/routi
go 1.22.0

require (
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.112.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.112.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.112.0
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector/component v0.112.0
go.opentelemetry.io/collector/confmap v1.18.0
Expand All @@ -21,6 +23,7 @@ require (
github.com/alecthomas/participle/v2 v2.1.1 // indirect
github.com/antchfx/xmlquery v1.4.2 // indirect
github.com/antchfx/xpath v1.3.2 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/elastic/go-grok v0.3.1 // indirect
github.com/elastic/lunes v0.1.0 // indirect
Expand All @@ -44,6 +47,7 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.112.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.112.0 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/ua-parser/uap-go v0.0.0-20240611065828-3a4781585db6 // indirect
go.opentelemetry.io/collector v0.112.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions connector/routingconnector/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

63 changes: 63 additions & 0 deletions connector/routingconnector/internal/plogutil/logs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package plogutil // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/routingconnector/internal/plogutil"

import (
"go.opentelemetry.io/collector/pdata/plog"
)

// MoveResourcesIf calls f sequentially for each ResourceLogs present in the first plog.Logs.
// If f returns true, the element is removed from the first plog.Logs and added to the second plog.Logs.
func MoveResourcesIf(from, to plog.Logs, f func(plog.ResourceLogs) bool) {
from.ResourceLogs().RemoveIf(func(rl plog.ResourceLogs) bool {
if !f(rl) {
return false
}
rl.CopyTo(to.ResourceLogs().AppendEmpty())
return true
})
}

// MoveRecordsWithContextIf calls f sequentially for each LogRecord present in the first plog.Logs.
// If f returns true, the element is removed from the first plog.Logs and added to the second plog.Logs.
// Notably, the Resource and Scope associated with the LogRecord are created in the second plog.Logs only once.
// Resources or Scopes are removed from the original if they become empty. All ordering is preserved.
func MoveRecordsWithContextIf(from, to plog.Logs, f func(plog.ResourceLogs, plog.ScopeLogs, plog.LogRecord) bool) {
rls := from.ResourceLogs()
for i := 0; i < rls.Len(); i++ {
rl := rls.At(i)
sls := rl.ScopeLogs()
var rlCopy *plog.ResourceLogs
for j := 0; j < sls.Len(); j++ {
sl := sls.At(j)
lrs := sl.LogRecords()
var slCopy *plog.ScopeLogs
lrs.RemoveIf(func(lr plog.LogRecord) bool {
if !f(rl, sl, lr) {
return false
}
if rlCopy == nil {
rlc := to.ResourceLogs().AppendEmpty()
rlCopy = &rlc
rl.Resource().CopyTo(rlCopy.Resource())
rlCopy.SetSchemaUrl(rl.SchemaUrl())
}
if slCopy == nil {
slc := rlCopy.ScopeLogs().AppendEmpty()
slCopy = &slc
sl.Scope().CopyTo(slCopy.Scope())
slCopy.SetSchemaUrl(sl.SchemaUrl())
}
lr.CopyTo(slCopy.LogRecords().AppendEmpty())
return true
})
}
sls.RemoveIf(func(sl plog.ScopeLogs) bool {
return sl.LogRecords().Len() == 0
})
}
rls.RemoveIf(func(rl plog.ResourceLogs) bool {
return rl.ScopeLogs().Len() == 0
})
}
161 changes: 161 additions & 0 deletions connector/routingconnector/internal/plogutil/logs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package plogutil_test

import (
"path/filepath"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/plog"

"github.com/open-telemetry/opentelemetry-collector-contrib/connector/routingconnector/internal/plogutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/plogtest"
)

func TestMoveResourcesIf(t *testing.T) {
testCases := []struct {
name string
condition func(plog.ResourceLogs) bool
}{
{
name: "move_none",
condition: func(plog.ResourceLogs) bool {
return false
},
},
{
name: "move_all",
condition: func(plog.ResourceLogs) bool {
return true
},
},
{
name: "move_one",
condition: func(rl plog.ResourceLogs) bool {
rname, ok := rl.Resource().Attributes().Get("resourceName")
return ok && rname.AsString() == "resourceA"
},
},
{
name: "move_to_preexisting",
condition: func(rl plog.ResourceLogs) bool {
rname, ok := rl.Resource().Attributes().Get("resourceName")
return ok && rname.AsString() == "resourceB"
},
},
}

for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
// Load up a fresh copy of the input for each test, since it may be modified in place.
from, err := golden.ReadLogs(filepath.Join("testdata", "resource", tt.name, "from.yaml"))
require.NoError(t, err)

to, err := golden.ReadLogs(filepath.Join("testdata", "resource", tt.name, "to.yaml"))
require.NoError(t, err)

fromModifed, err := golden.ReadLogs(filepath.Join("testdata", "resource", tt.name, "from_modified.yaml"))
require.NoError(t, err)

toModified, err := golden.ReadLogs(filepath.Join("testdata", "resource", tt.name, "to_modified.yaml"))
require.NoError(t, err)

plogutil.MoveResourcesIf(from, to, tt.condition)

assert.NoError(t, plogtest.CompareLogs(fromModifed, from), "from not modified as expected")
assert.NoError(t, plogtest.CompareLogs(toModified, to), "to not as expected")
})
}
}

func TestMoveRecordsWithContextIf(t *testing.T) {
testCases := []struct {
name string
condition func(plog.ResourceLogs, plog.ScopeLogs, plog.LogRecord) bool
}{
{
name: "move_none",
condition: func(plog.ResourceLogs, plog.ScopeLogs, plog.LogRecord) bool {
return false
},
},
{
name: "move_all",
condition: func(plog.ResourceLogs, plog.ScopeLogs, plog.LogRecord) bool {
return true
},
},
{
name: "move_all_from_one_resource",
condition: func(rl plog.ResourceLogs, _ plog.ScopeLogs, _ plog.LogRecord) bool {
rname, ok := rl.Resource().Attributes().Get("resourceName")
return ok && rname.AsString() == "resourceB"
},
},
{
name: "move_all_from_one_scope",
condition: func(rl plog.ResourceLogs, sl plog.ScopeLogs, _ plog.LogRecord) bool {
rname, ok := rl.Resource().Attributes().Get("resourceName")
return ok && rname.AsString() == "resourceB" && sl.Scope().Name() == "scopeA"
},
},
{
name: "move_all_from_one_scope_in_each_resource",
condition: func(_ plog.ResourceLogs, sl plog.ScopeLogs, _ plog.LogRecord) bool {
return sl.Scope().Name() == "scopeB"
},
},
{
name: "move_one",
condition: func(rl plog.ResourceLogs, sl plog.ScopeLogs, lr plog.LogRecord) bool {
rname, ok := rl.Resource().Attributes().Get("resourceName")
return ok && rname.AsString() == "resourceA" && sl.Scope().Name() == "scopeB" && lr.Body().AsString() == "logB"
},
},
{
name: "move_one_from_each_scope",
condition: func(_ plog.ResourceLogs, _ plog.ScopeLogs, lr plog.LogRecord) bool {
return lr.Body().AsString() == "logA"
},
},
{
name: "move_one_from_each_scope_in_one_resource",
condition: func(rl plog.ResourceLogs, _ plog.ScopeLogs, lr plog.LogRecord) bool {
rname, ok := rl.Resource().Attributes().Get("resourceName")
return ok && rname.AsString() == "resourceB" && lr.Body().AsString() == "logA"
},
},
{
name: "move_some_to_preexisting",
condition: func(_ plog.ResourceLogs, sl plog.ScopeLogs, _ plog.LogRecord) bool {
return sl.Scope().Name() == "scopeB"
},
},
}

for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
// Load up a fresh copy of the input for each test, since it may be modified in place.
from, err := golden.ReadLogs(filepath.Join("testdata", "record", tt.name, "from.yaml"))
require.NoError(t, err)

to, err := golden.ReadLogs(filepath.Join("testdata", "record", tt.name, "to.yaml"))
require.NoError(t, err)

fromModifed, err := golden.ReadLogs(filepath.Join("testdata", "record", tt.name, "from_modified.yaml"))
require.NoError(t, err)

toModified, err := golden.ReadLogs(filepath.Join("testdata", "record", tt.name, "to_modified.yaml"))
require.NoError(t, err)

plogutil.MoveRecordsWithContextIf(from, to, tt.condition)

assert.NoError(t, plogtest.CompareLogs(fromModifed, from), "from not modified as expected")
assert.NoError(t, plogtest.CompareLogs(toModified, to), "to not as expected")
})
}
}
Loading

0 comments on commit 8a43d36

Please sign in to comment.