Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pkg/stanza] Switch from SugaredLogger to Logger #32177

Merged
merged 1 commit into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/pkg-stanza-rm-sugared-api.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/stanza

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: The internal logger has been changed from zap.SugaredLogger to zap.Logger.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [32177]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: Functions accepting a SugaredLogger, and fields of type SugaredLogger, have been deprecated.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
27 changes: 27 additions & 0 deletions .chloggen/pkg-stanza-rm-sugared.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking
mx-psi marked this conversation as resolved.
Show resolved Hide resolved

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: filelog, journald, tcp, udp, syslog, windowseventlog receivers

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: The internal logger has been changed from zap.SugaredLogger to zap.Logger.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [32177]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: This should not have any meaningful impact on most users but the logging format for some logs may have changed.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
14 changes: 8 additions & 6 deletions pkg/stanza/adapter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"sync"

"github.com/cespare/xxhash/v2"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/zap"
Expand Down Expand Up @@ -52,6 +53,8 @@ import (
// │ downstream consumers via OutChannel() │
// └─────────────────────────────────────────────────────┘
type Converter struct {
set component.TelemetrySettings

// pLogsChan is a channel on which aggregated logs will be sent to.
pLogsChan chan plog.Logs

Expand All @@ -70,8 +73,6 @@ type Converter struct {
// wg is a WaitGroup that makes sure that we wait for spun up goroutines exit
// when Stop() is called.
wg sync.WaitGroup

logger *zap.Logger
}

type converterOption interface {
Expand All @@ -90,14 +91,15 @@ func (o workerCountOption) apply(c *Converter) {
c.workerCount = o.workerCount
}

func NewConverter(logger *zap.Logger, opts ...converterOption) *Converter {
func NewConverter(set component.TelemetrySettings, opts ...converterOption) *Converter {
set.Logger = set.Logger.With(zap.String("component", "converter"))
c := &Converter{
set: set,
workerChan: make(chan []*entry.Entry),
workerCount: int(math.Max(1, float64(runtime.NumCPU()/4))),
pLogsChan: make(chan plog.Logs),
stopChan: make(chan struct{}),
flushChan: make(chan plog.Logs),
logger: logger,
}
for _, opt := range opts {
opt.apply(c)
Expand All @@ -106,7 +108,7 @@ func NewConverter(logger *zap.Logger, opts ...converterOption) *Converter {
}

func (c *Converter) Start() {
c.logger.Debug("Starting log converter", zap.Int("worker_count", c.workerCount))
c.set.Logger.Debug("Starting log converter", zap.Int("worker_count", c.workerCount))

c.wg.Add(c.workerCount)
for i := 0; i < c.workerCount; i++ {
Expand Down Expand Up @@ -202,7 +204,7 @@ func (c *Converter) flushLoop() {

case pLogs := <-c.flushChan:
if err := c.flush(ctx, pLogs); err != nil {
c.logger.Debug("Problem sending log entries",
c.set.Logger.Debug("Problem sending log entries",
zap.Error(err),
)
}
Expand Down
20 changes: 14 additions & 6 deletions pkg/stanza/adapter/converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
)
Expand Down Expand Up @@ -391,7 +392,9 @@ func TestAllConvertedEntriesScopeGrouping(t *testing.T) {
t.Run(strconv.Itoa(i), func(t *testing.T) {
t.Parallel()

converter := NewConverter(zap.NewNop())
set := componenttest.NewNopTelemetrySettings()
set.Logger = zaptest.NewLogger(t)
converter := NewConverter(set)
converter.Start()
defer converter.Stop()

Expand Down Expand Up @@ -458,7 +461,9 @@ func TestAllConvertedEntriesAreSentAndReceived(t *testing.T) {
t.Run(strconv.Itoa(i), func(t *testing.T) {
t.Parallel()

converter := NewConverter(zap.NewNop())
set := componenttest.NewNopTelemetrySettings()
set.Logger = zaptest.NewLogger(t)
converter := NewConverter(set)
converter.Start()
defer converter.Stop()

Expand Down Expand Up @@ -520,7 +525,9 @@ func TestAllConvertedEntriesAreSentAndReceived(t *testing.T) {
}

func TestConverterCancelledContextCancellsTheFlush(t *testing.T) {
converter := NewConverter(zap.NewNop())
set := componenttest.NewNopTelemetrySettings()
set.Logger = zaptest.NewLogger(t)
converter := NewConverter(set)
converter.Start()
defer converter.Stop()
var wg sync.WaitGroup
Expand Down Expand Up @@ -932,8 +939,9 @@ func BenchmarkConverter(b *testing.B) {
for _, wc := range workerCounts {
b.Run(fmt.Sprintf("worker_count=%d", wc), func(b *testing.B) {
for i := 0; i < b.N; i++ {

converter := NewConverter(zap.NewNop(), withWorkerCount(wc))
set := componenttest.NewNopTelemetrySettings()
set.Logger = zaptest.NewLogger(b)
converter := NewConverter(set, withWorkerCount(wc))
converter.Start()
defer converter.Stop()

Expand Down
7 changes: 4 additions & 3 deletions pkg/stanza/adapter/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@
package adapter // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter"

import (
"go.opentelemetry.io/collector/component"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
)

// Deprecated [v0.100.0] Use helper.LogEmitter directly instead
// Deprecated [v0.101.0] Use helper.LogEmitter directly instead
type LogEmitter = helper.LogEmitter

// Deprecated [v0.100.0] Use helper.NewLogEmitter directly instead
// Deprecated [v0.101.0] Use helper.NewLogEmitter directly instead
func NewLogEmitter(logger *zap.SugaredLogger, opts ...helper.EmitterOption) *LogEmitter {
return helper.NewLogEmitter(logger, opts...)
return helper.NewLogEmitter(component.TelemetrySettings{Logger: logger.Desugar()}, opts...)
}
6 changes: 3 additions & 3 deletions pkg/stanza/adapter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func createLogsReceiver(logReceiverType LogReceiverType) rcvr.CreateLogsFunc {
if baseCfg.flushInterval > 0 {
emitterOpts = append(emitterOpts, helper.WithFlushInterval(baseCfg.flushInterval))
}
emitter := helper.NewLogEmitter(params.Logger.Sugar(), emitterOpts...)
emitter := helper.NewLogEmitter(params.TelemetrySettings, emitterOpts...)
pipe, err := pipeline.Config{
Operators: operators,
DefaultOutput: emitter,
Expand All @@ -66,7 +66,7 @@ func createLogsReceiver(logReceiverType LogReceiverType) rcvr.CreateLogsFunc {
if baseCfg.numWorkers > 0 {
converterOpts = append(converterOpts, withWorkerCount(baseCfg.numWorkers))
}
converter := NewConverter(params.Logger, converterOpts...)
converter := NewConverter(params.TelemetrySettings, converterOpts...)
obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
ReceiverID: params.ID,
ReceiverCreateSettings: params,
Expand All @@ -75,11 +75,11 @@ func createLogsReceiver(logReceiverType LogReceiverType) rcvr.CreateLogsFunc {
return nil, err
}
return &receiver{
set: params.TelemetrySettings,
id: params.ID,
pipe: pipe,
emitter: emitter,
consumer: consumerretry.NewLogs(baseCfg.RetryOnFailure, params.Logger, nextConsumer),
logger: params.Logger,
converter: converter,
obsrecv: obsrecv,
storageID: baseCfg.StorageID,
Expand Down
15 changes: 8 additions & 7 deletions pkg/stanza/adapter/frompdataconverter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sync"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/zap"
Expand Down Expand Up @@ -38,6 +39,8 @@ import (
// └─┤ and sends them along entriesChan │
// └───────────────────────────────────────────────────┘
type FromPdataConverter struct {
set component.TelemetrySettings

// entriesChan is a channel on which converted logs will be sent out of the converter.
entriesChan chan []*entry.Entry

Expand All @@ -51,28 +54,26 @@ type FromPdataConverter struct {
// wg is a WaitGroup that makes sure that we wait for spun up goroutines exit
// when Stop() is called.
wg sync.WaitGroup

logger *zap.Logger
}

func NewFromPdataConverter(workerCount int, logger *zap.Logger) *FromPdataConverter {
if logger == nil {
logger = zap.NewNop()
func NewFromPdataConverter(set component.TelemetrySettings, workerCount int) *FromPdataConverter {
if set.Logger == nil {
set.Logger = zap.NewNop()
}
if workerCount <= 0 {
workerCount = int(math.Max(1, float64(runtime.NumCPU())))
}

return &FromPdataConverter{
set: set,
workerChan: make(chan fromConverterWorkerItem, workerCount),
entriesChan: make(chan []*entry.Entry),
stopChan: make(chan struct{}),
logger: logger,
}
}

func (c *FromPdataConverter) Start() {
c.logger.Debug("Starting log converter from pdata", zap.Int("worker_count", cap(c.workerChan)))
c.set.Logger.Debug("Starting log converter from pdata", zap.Int("worker_count", cap(c.workerChan)))

for i := 0; i < cap(c.workerChan); i++ {
c.wg.Add(1)
Expand Down
3 changes: 2 additions & 1 deletion pkg/stanza/adapter/frompdataconverter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"

Expand Down Expand Up @@ -126,7 +127,7 @@ func BenchmarkFromPdataConverter(b *testing.B) {
b.Run(fmt.Sprintf("worker_count=%d", wc), func(b *testing.B) {
for i := 0; i < b.N; i++ {

converter := NewFromPdataConverter(wc, nil)
converter := NewFromPdataConverter(componenttest.NewNopTelemetrySettings(), wc)
converter.Start()
defer converter.Stop()
b.ResetTimer()
Expand Down
8 changes: 4 additions & 4 deletions pkg/stanza/adapter/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import (
)

func createNoopReceiver(nextConsumer consumer.Logs) (*receiver, error) {
emitter := helper.NewLogEmitter(zap.NewNop().Sugar())

set := componenttest.NewNopTelemetrySettings()
set.Logger = zap.NewNop()
emitter := helper.NewLogEmitter(set)
pipe, err := pipeline.Config{
Operators: []operator.Config{
{
Expand All @@ -48,12 +48,12 @@ func createNoopReceiver(nextConsumer consumer.Logs) (*receiver, error) {
}

return &receiver{
set: set,
id: component.MustNewID("testReceiver"),
pipe: pipe,
emitter: emitter,
consumer: nextConsumer,
logger: zap.NewNop(),
converter: NewConverter(zap.NewNop()),
converter: NewConverter(componenttest.NewNopTelemetrySettings()),
obsrecv: obsrecv,
}, nil
}
Expand Down
16 changes: 8 additions & 8 deletions pkg/stanza/adapter/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
)

type receiver struct {
set component.TelemetrySettings
id component.ID
wg sync.WaitGroup
cancel context.CancelFunc
Expand All @@ -29,7 +30,6 @@ type receiver struct {
emitter *helper.LogEmitter
consumer consumer.Logs
converter *Converter
logger *zap.Logger
obsrecv *receiverhelper.ObsReport

storageID *component.ID
Expand All @@ -43,7 +43,7 @@ var _ rcvr.Logs = (*receiver)(nil)
func (r *receiver) Start(ctx context.Context, host component.Host) error {
rctx, cancel := context.WithCancel(ctx)
r.cancel = cancel
r.logger.Info("Starting stanza receiver")
r.set.Logger.Info("Starting stanza receiver")

if err := r.setStorageClient(ctx, host); err != nil {
return fmt.Errorf("storage client: %w", err)
Expand Down Expand Up @@ -88,7 +88,7 @@ func (r *receiver) emitterLoop(ctx context.Context) {
for {
select {
case <-doneChan:
r.logger.Debug("Receive loop stopped")
r.set.Logger.Debug("Receive loop stopped")
return

case e, ok := <-r.emitter.OutChannel():
Expand All @@ -97,7 +97,7 @@ func (r *receiver) emitterLoop(ctx context.Context) {
}

if err := r.converter.Batch(e); err != nil {
r.logger.Error("Could not add entry to batch", zap.Error(err))
r.set.Logger.Error("Could not add entry to batch", zap.Error(err))
}
}
}
Expand All @@ -113,19 +113,19 @@ func (r *receiver) consumerLoop(ctx context.Context) {
for {
select {
case <-doneChan:
r.logger.Debug("Consumer loop stopped")
r.set.Logger.Debug("Consumer loop stopped")
return

case pLogs, ok := <-pLogsChan:
if !ok {
r.logger.Debug("Converter channel got closed")
r.set.Logger.Debug("Converter channel got closed")
continue
}
obsrecvCtx := r.obsrecv.StartLogsOp(ctx)
logRecordCount := pLogs.LogRecordCount()
cErr := r.consumer.ConsumeLogs(ctx, pLogs)
if cErr != nil {
r.logger.Error("ConsumeLogs() failed", zap.Error(cErr))
r.set.Logger.Error("ConsumeLogs() failed", zap.Error(cErr))
}
r.obsrecv.EndLogsOp(obsrecvCtx, "stanza", logRecordCount, cErr)
}
Expand All @@ -138,7 +138,7 @@ func (r *receiver) Shutdown(ctx context.Context) error {
return nil
}

r.logger.Info("Stopping stanza receiver")
r.set.Logger.Info("Stopping stanza receiver")
pipelineErr := r.pipe.Stop()
r.converter.Stop()
r.cancel()
Expand Down
Loading
Loading