Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore][connector/routing] Add internal package plogutil for splitting logs #35957

Merged
merged 1 commit into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions connector/routingconnector/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/connector/routi
go 1.22.0

require (
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.112.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.112.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.112.0
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector/component v0.112.0
go.opentelemetry.io/collector/confmap v1.18.0
Expand All @@ -21,6 +23,7 @@ require (
github.com/alecthomas/participle/v2 v2.1.1 // indirect
github.com/antchfx/xmlquery v1.4.2 // indirect
github.com/antchfx/xpath v1.3.2 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/elastic/go-grok v0.3.1 // indirect
github.com/elastic/lunes v0.1.0 // indirect
Expand All @@ -44,6 +47,7 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.112.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.112.0 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/ua-parser/uap-go v0.0.0-20240611065828-3a4781585db6 // indirect
go.opentelemetry.io/collector v0.112.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions connector/routingconnector/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

63 changes: 63 additions & 0 deletions connector/routingconnector/internal/plogutil/logs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package plogutil // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/routingconnector/internal/plogutil"

import (
"go.opentelemetry.io/collector/pdata/plog"
)

// MoveResourcesIf calls f sequentially for each ResourceLogs present in the first plog.Logs.
// If f returns true, the element is removed from the first plog.Logs and added to the second plog.Logs.
func MoveResourcesIf(from, to plog.Logs, f func(plog.ResourceLogs) bool) {
from.ResourceLogs().RemoveIf(func(rl plog.ResourceLogs) bool {
if !f(rl) {
return false
}
rl.CopyTo(to.ResourceLogs().AppendEmpty())
return true
})
}

// MoveRecordsWithContextIf calls f sequentially for each LogRecord present in the first plog.Logs.
// If f returns true, the element is removed from the first plog.Logs and added to the second plog.Logs.
// Notably, the Resource and Scope associated with the LogRecord are created in the second plog.Logs only once.
// Resources or Scopes are removed from the original if they become empty. All ordering is preserved.
func MoveRecordsWithContextIf(from, to plog.Logs, f func(plog.ResourceLogs, plog.ScopeLogs, plog.LogRecord) bool) {
rls := from.ResourceLogs()
for i := 0; i < rls.Len(); i++ {
rl := rls.At(i)
sls := rl.ScopeLogs()
var rlCopy *plog.ResourceLogs
for j := 0; j < sls.Len(); j++ {
sl := sls.At(j)
lrs := sl.LogRecords()
var slCopy *plog.ScopeLogs
lrs.RemoveIf(func(lr plog.LogRecord) bool {
if !f(rl, sl, lr) {
return false
}
if rlCopy == nil {
rlc := to.ResourceLogs().AppendEmpty()
rlCopy = &rlc
rl.Resource().CopyTo(rlCopy.Resource())
rlCopy.SetSchemaUrl(rl.SchemaUrl())
}
if slCopy == nil {
slc := rlCopy.ScopeLogs().AppendEmpty()
slCopy = &slc
sl.Scope().CopyTo(slCopy.Scope())
slCopy.SetSchemaUrl(sl.SchemaUrl())
}
lr.CopyTo(slCopy.LogRecords().AppendEmpty())
return true
})
}
sls.RemoveIf(func(sl plog.ScopeLogs) bool {
return sl.LogRecords().Len() == 0
})
}
rls.RemoveIf(func(rl plog.ResourceLogs) bool {
return rl.ScopeLogs().Len() == 0
})
}
161 changes: 161 additions & 0 deletions connector/routingconnector/internal/plogutil/logs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package plogutil_test

import (
"path/filepath"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/plog"

"github.com/open-telemetry/opentelemetry-collector-contrib/connector/routingconnector/internal/plogutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/plogtest"
)

func TestMoveResourcesIf(t *testing.T) {
testCases := []struct {
name string
condition func(plog.ResourceLogs) bool
}{
{
name: "move_none",
condition: func(plog.ResourceLogs) bool {
return false
},
},
{
name: "move_all",
condition: func(plog.ResourceLogs) bool {
return true
},
},
{
name: "move_one",
condition: func(rl plog.ResourceLogs) bool {
rname, ok := rl.Resource().Attributes().Get("resourceName")
return ok && rname.AsString() == "resourceA"
},
},
{
name: "move_to_preexisting",
condition: func(rl plog.ResourceLogs) bool {
rname, ok := rl.Resource().Attributes().Get("resourceName")
return ok && rname.AsString() == "resourceB"
},
},
}

for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
// Load up a fresh copy of the input for each test, since it may be modified in place.
from, err := golden.ReadLogs(filepath.Join("testdata", "resource", tt.name, "from.yaml"))
require.NoError(t, err)

to, err := golden.ReadLogs(filepath.Join("testdata", "resource", tt.name, "to.yaml"))
require.NoError(t, err)

fromModifed, err := golden.ReadLogs(filepath.Join("testdata", "resource", tt.name, "from_modified.yaml"))
require.NoError(t, err)

toModified, err := golden.ReadLogs(filepath.Join("testdata", "resource", tt.name, "to_modified.yaml"))
require.NoError(t, err)

plogutil.MoveResourcesIf(from, to, tt.condition)

assert.NoError(t, plogtest.CompareLogs(fromModifed, from), "from not modified as expected")
assert.NoError(t, plogtest.CompareLogs(toModified, to), "to not as expected")
})
}
}

func TestMoveRecordsWithContextIf(t *testing.T) {
testCases := []struct {
name string
condition func(plog.ResourceLogs, plog.ScopeLogs, plog.LogRecord) bool
}{
{
name: "move_none",
condition: func(plog.ResourceLogs, plog.ScopeLogs, plog.LogRecord) bool {
return false
},
},
{
name: "move_all",
condition: func(plog.ResourceLogs, plog.ScopeLogs, plog.LogRecord) bool {
return true
},
},
{
name: "move_all_from_one_resource",
condition: func(rl plog.ResourceLogs, _ plog.ScopeLogs, _ plog.LogRecord) bool {
rname, ok := rl.Resource().Attributes().Get("resourceName")
return ok && rname.AsString() == "resourceB"
},
},
{
name: "move_all_from_one_scope",
condition: func(rl plog.ResourceLogs, sl plog.ScopeLogs, _ plog.LogRecord) bool {
rname, ok := rl.Resource().Attributes().Get("resourceName")
return ok && rname.AsString() == "resourceB" && sl.Scope().Name() == "scopeA"
},
},
{
name: "move_all_from_one_scope_in_each_resource",
condition: func(_ plog.ResourceLogs, sl plog.ScopeLogs, _ plog.LogRecord) bool {
return sl.Scope().Name() == "scopeB"
},
},
{
name: "move_one",
condition: func(rl plog.ResourceLogs, sl plog.ScopeLogs, lr plog.LogRecord) bool {
rname, ok := rl.Resource().Attributes().Get("resourceName")
return ok && rname.AsString() == "resourceA" && sl.Scope().Name() == "scopeB" && lr.Body().AsString() == "logB"
},
},
{
name: "move_one_from_each_scope",
condition: func(_ plog.ResourceLogs, _ plog.ScopeLogs, lr plog.LogRecord) bool {
return lr.Body().AsString() == "logA"
},
},
{
name: "move_one_from_each_scope_in_one_resource",
condition: func(rl plog.ResourceLogs, _ plog.ScopeLogs, lr plog.LogRecord) bool {
rname, ok := rl.Resource().Attributes().Get("resourceName")
return ok && rname.AsString() == "resourceB" && lr.Body().AsString() == "logA"
},
},
{
name: "move_some_to_preexisting",
condition: func(_ plog.ResourceLogs, sl plog.ScopeLogs, _ plog.LogRecord) bool {
return sl.Scope().Name() == "scopeB"
},
},
}

for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
// Load up a fresh copy of the input for each test, since it may be modified in place.
from, err := golden.ReadLogs(filepath.Join("testdata", "record", tt.name, "from.yaml"))
require.NoError(t, err)

to, err := golden.ReadLogs(filepath.Join("testdata", "record", tt.name, "to.yaml"))
require.NoError(t, err)

fromModifed, err := golden.ReadLogs(filepath.Join("testdata", "record", tt.name, "from_modified.yaml"))
require.NoError(t, err)

toModified, err := golden.ReadLogs(filepath.Join("testdata", "record", tt.name, "to_modified.yaml"))
require.NoError(t, err)

plogutil.MoveRecordsWithContextIf(from, to, tt.condition)

assert.NoError(t, plogtest.CompareLogs(fromModifed, from), "from not modified as expected")
assert.NoError(t, plogtest.CompareLogs(toModified, to), "to not as expected")
})
}
}
Loading
Loading