From 7afad781d7aada3ba17f062947f3d6fb44143acc Mon Sep 17 00:00:00 2001 From: Piotr <17101802+thampiotr@users.noreply.github.com> Date: Thu, 29 Jun 2023 16:15:12 +0100 Subject: [PATCH] Make loki.LogsReceiver an interface (#4303) --- component/common/loki/client/fake/client.go | 6 +-- component/common/loki/types.go | 28 +++++++++++-- component/loki/echo/echo.go | 5 +-- component/loki/process/process.go | 8 ++-- component/loki/process/process_test.go | 30 +++++++------- component/loki/relabel/relabel.go | 6 +-- component/loki/relabel/relabel_test.go | 40 +++++++++---------- component/loki/source/api/api.go | 2 +- component/loki/source/api/api_test.go | 2 +- .../loki/source/aws_firehose/component.go | 8 ++-- .../source/aws_firehose/component_test.go | 8 ++-- .../azure_event_hubs/azure_event_hubs.go | 8 ++-- .../loki/source/cloudflare/cloudflare.go | 8 ++-- component/loki/source/docker/docker.go | 8 ++-- component/loki/source/file/file.go | 10 ++--- component/loki/source/file/file_test.go | 10 ++--- component/loki/source/gcplog/gcplog.go | 8 ++-- component/loki/source/gcplog/gcplog_test.go | 6 +-- component/loki/source/gelf/gelf.go | 2 +- component/loki/source/gelf/gelf_test.go | 4 +- component/loki/source/heroku/heroku.go | 8 ++-- component/loki/source/heroku/heroku_test.go | 8 ++-- component/loki/source/journal/journal.go | 2 +- component/loki/source/journal/journal_test.go | 4 +- component/loki/source/kafka/kafka.go | 8 ++-- .../loki/source/kubernetes/kubernetes.go | 8 ++-- .../kubernetes_events/event_controller.go | 2 +- .../kubernetes_events/kubernetes_events.go | 6 +-- component/loki/source/podlogs/podlogs.go | 8 ++-- component/loki/source/syslog/syslog.go | 8 ++-- component/loki/source/syslog/syslog_test.go | 14 +++---- .../source/windowsevent/component_test.go | 2 +- .../source/windowsevent/component_windows.go | 2 +- component/loki/write/write.go | 4 +- component/loki/write/write_test.go | 4 +- .../exporter/loki/internal/convert/convert.go | 2 +- .../loki/internal/convert/convert_test.go | 6 +-- component/otelcol/receiver/loki/loki.go | 4 +- component/otelcol/receiver/loki/loki_test.go | 2 +- 39 files changed, 164 insertions(+), 145 deletions(-) diff --git a/component/common/loki/client/fake/client.go b/component/common/loki/client/fake/client.go index 4ec84117b853..5f2fc61319e4 100644 --- a/component/common/loki/client/fake/client.go +++ b/component/common/loki/client/fake/client.go @@ -11,7 +11,7 @@ import ( // Client is a fake client used for testing. type Client struct { - entries loki.LogsReceiver + entries chan loki.Entry received []loki.Entry once sync.Once mtx sync.Mutex @@ -22,7 +22,7 @@ type Client struct { func NewClient(stop func()) *Client { c := &Client{ OnStop: stop, - entries: make(loki.LogsReceiver), + entries: make(chan loki.Entry), } c.wg.Add(1) go func() { @@ -49,7 +49,7 @@ func (c *Client) Chan() chan<- loki.Entry { // LogsReceiver returns this client as a LogsReceiver, which is useful in testing. func (c *Client) LogsReceiver() loki.LogsReceiver { - return c.entries + return loki.NewLogsReceiverWithChannel(c.entries) } func (c *Client) Received() []loki.Entry { diff --git a/component/common/loki/types.go b/component/common/loki/types.go index eb52d95ebbea..f3d31b8d728f 100644 --- a/component/common/loki/types.go +++ b/component/common/loki/types.go @@ -23,9 +23,29 @@ import ( // to an outage or erroring (such as limits being hit). const finalEntryTimeout = 5 * time.Second -// LogsReceiver is an alias for chan Entry which is used for component +// LogsReceiver is an interface providing `chan Entry` which is used for component // communication. -type LogsReceiver chan Entry +type LogsReceiver interface { + Chan() chan Entry +} + +type logsReceiver struct { + entries chan Entry +} + +func (l *logsReceiver) Chan() chan Entry { + return l.entries +} + +func NewLogsReceiver() LogsReceiver { + return NewLogsReceiverWithChannel(make(chan Entry)) +} + +func NewLogsReceiverWithChannel(c chan Entry) LogsReceiver { + return &logsReceiver{ + entries: c, + } +} // Entry is a log entry with labels. type Entry struct { @@ -55,7 +75,7 @@ type EntryHandler interface { } // EntryMiddleware takes an EntryHandler and returns another one that will intercept and forward entries. -// The newly created EntryHandler should be Stopped independently from the original one. +// The newly created EntryHandler should be Stopped independently of the original one. type EntryMiddleware interface { Wrap(EntryHandler) EntryHandler } @@ -133,7 +153,7 @@ func NewEntryMutatorHandler(next EntryHandler, f EntryMutatorFunc) EntryHandler select { case <-ctx.Done(): - // The goroutine above exited on its own so we don't have to wait for + // The goroutine above exited on its own, so we don't have to wait for // the timeout. case <-time.After(finalEntryTimeout): // We reached the timeout for sending the final entry to nextChan; diff --git a/component/loki/echo/echo.go b/component/loki/echo/echo.go index 3dc1e4b9f3a5..bf40efeec86c 100644 --- a/component/loki/echo/echo.go +++ b/component/loki/echo/echo.go @@ -53,10 +53,9 @@ type Component struct { // New creates a new loki.echo component. func New(o component.Options, args Arguments) (*Component, error) { - ch := make(chan loki.Entry) c := &Component{ opts: o, - receiver: ch, + receiver: loki.NewLogsReceiver(), } // Call to Update() once at the start. @@ -77,7 +76,7 @@ func (c *Component) Run(ctx context.Context) error { select { case <-ctx.Done(): return nil - case entry := <-c.receiver: + case entry := <-c.receiver.Chan(): level.Info(c.opts.Logger).Log("receiver", c.opts.ID, "entry", entry.Line, "labels", entry.Labels.String()) } } diff --git a/component/loki/process/process.go b/component/loki/process/process.go index 5114c4044954..e14cdc8200e5 100644 --- a/component/loki/process/process.go +++ b/component/loki/process/process.go @@ -59,8 +59,8 @@ func New(o component.Options, args Arguments) (*Component, error) { // Create and immediately export the receiver which remains the same for // the component's lifetime. - c.receiver = make(loki.LogsReceiver) - c.processOut = make(loki.LogsReceiver) + c.receiver = loki.NewLogsReceiver() + c.processOut = make(chan loki.Entry) o.OnStateChange(Exports{Receiver: c.receiver}) // Call to Update() to start readers and set receivers once at the start. @@ -126,7 +126,7 @@ func (c *Component) handleIn(ctx context.Context, wg *sync.WaitGroup) { select { case <-ctx.Done(): return - case entry := <-c.receiver: + case entry := <-c.receiver.Chan(): c.mut.RLock() select { case <-ctx.Done(): @@ -155,7 +155,7 @@ func (c *Component) handleOut(ctx context.Context, wg *sync.WaitGroup) { select { case <-ctx.Done(): return - case f <- entry: + case f.Chan() <- entry: // no-op } } diff --git a/component/loki/process/process_test.go b/component/loki/process/process_test.go index 81bf09dd7552..998648c3afc2 100644 --- a/component/loki/process/process_test.go +++ b/component/loki/process/process_test.go @@ -67,7 +67,7 @@ func TestJSONLabelsStage(t *testing.T) { err := river.Unmarshal([]byte(stg), &stagesCfg) require.NoError(t, err) - ch1, ch2 := make(loki.LogsReceiver), make(loki.LogsReceiver) + ch1, ch2 := loki.NewLogsReceiver(), loki.NewLogsReceiver() // Create and run the component, so that it can process and forwards logs. opts := component.Options{ @@ -97,7 +97,7 @@ func TestJSONLabelsStage(t *testing.T) { }, } - c.receiver <- logEntry + c.receiver.Chan() <- logEntry wantLabelSet := model.LabelSet{ "filename": "/var/log/pods/agent/agent/1.log", @@ -111,11 +111,11 @@ func TestJSONLabelsStage(t *testing.T) { // stages correctly applied. for i := 0; i < 2; i++ { select { - case logEntry := <-ch1: + case logEntry := <-ch1.Chan(): require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second) require.Equal(t, logline, logEntry.Line) require.Equal(t, wantLabelSet, logEntry.Labels) - case logEntry := <-ch2: + case logEntry := <-ch2.Chan(): require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second) require.Equal(t, logline, logEntry.Line) require.Equal(t, wantLabelSet, logEntry.Labels) @@ -154,7 +154,7 @@ stage.label_keep { err := river.Unmarshal([]byte(stg), &stagesCfg) require.NoError(t, err) - ch1, ch2 := make(loki.LogsReceiver), make(loki.LogsReceiver) + ch1, ch2 := loki.NewLogsReceiver(), loki.NewLogsReceiver() // Create and run the component, so that it can process and forwards logs. opts := component.Options{ @@ -184,7 +184,7 @@ stage.label_keep { }, } - c.receiver <- logEntry + c.receiver.Chan() <- logEntry wantLabelSet := model.LabelSet{ "filename": "/var/log/pods/agent/agent/1.log", @@ -195,11 +195,11 @@ stage.label_keep { // stages correctly applied. for i := 0; i < 2; i++ { select { - case logEntry := <-ch1: + case logEntry := <-ch1.Chan(): require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second) require.Equal(t, logline, logEntry.Line) require.Equal(t, wantLabelSet, logEntry.Labels) - case logEntry := <-ch2: + case logEntry := <-ch2.Chan(): require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second) require.Equal(t, logline, logEntry.Line) require.Equal(t, wantLabelSet, logEntry.Labels) @@ -249,7 +249,7 @@ stage.labels { err := river.Unmarshal([]byte(stg), &stagesCfg) require.NoError(t, err) - ch1, ch2 := make(loki.LogsReceiver), make(loki.LogsReceiver) + ch1, ch2 := loki.NewLogsReceiver(), loki.NewLogsReceiver() // Create and run the component, so that it can process and forwards logs. opts := component.Options{ @@ -279,7 +279,7 @@ stage.labels { }, } - c.receiver <- logEntry + c.receiver.Chan() <- logEntry wantLabelSet := model.LabelSet{ "filename": "/var/log/pods/agent/agent/1.log", @@ -294,11 +294,11 @@ stage.labels { // stages correctly applied. for i := 0; i < 2; i++ { select { - case logEntry := <-ch1: + case logEntry := <-ch1.Chan(): require.Equal(t, wantLogline, logEntry.Line) require.Equal(t, wantTimestamp, logEntry.Timestamp) require.Equal(t, wantLabelSet, logEntry.Labels) - case logEntry := <-ch2: + case logEntry := <-ch2.Chan(): require.Equal(t, wantLogline, logEntry.Line) require.Equal(t, wantTimestamp, logEntry.Timestamp) require.Equal(t, wantLabelSet, logEntry.Labels) @@ -323,7 +323,7 @@ stage.static_labels { } ` - ch1, ch2 := make(loki.LogsReceiver), make(loki.LogsReceiver) + ch1, ch2 := loki.NewLogsReceiver(), loki.NewLogsReceiver() var args1, args2 Arguments require.NoError(t, river.Unmarshal([]byte(stg1), &args1)) require.NoError(t, river.Unmarshal([]byte(stg2), &args2)) @@ -374,11 +374,11 @@ stage.static_labels { // race condition between them. for i := 0; i < 2; i++ { select { - case logEntry := <-ch1: + case logEntry := <-ch1.Chan(): require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second) require.Equal(t, "writing some text", logEntry.Line) require.Equal(t, wantLabelSet.Clone().Merge(model.LabelSet{"lbl": "foo"}), logEntry.Labels) - case logEntry := <-ch2: + case logEntry := <-ch2.Chan(): require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second) require.Equal(t, "writing some text", logEntry.Line) require.Equal(t, wantLabelSet.Clone().Merge(model.LabelSet{"lbl": "bar"}), logEntry.Labels) diff --git a/component/loki/relabel/relabel.go b/component/loki/relabel/relabel.go index b73c0ae0b6d8..50fa15c9f06b 100644 --- a/component/loki/relabel/relabel.go +++ b/component/loki/relabel/relabel.go @@ -90,7 +90,7 @@ func New(o component.Options, args Arguments) (*Component, error) { // Create and immediately export the receiver which remains the same for // the component's lifetime. - c.receiver = make(loki.LogsReceiver) + c.receiver = loki.NewLogsReceiver() o.OnStateChange(Exports{Receiver: c.receiver, Rules: args.RelabelConfigs}) // Call to Update() to set the relabelling rules once at the start. @@ -107,7 +107,7 @@ func (c *Component) Run(ctx context.Context) error { select { case <-ctx.Done(): return nil - case entry := <-c.receiver: + case entry := <-c.receiver.Chan(): c.metrics.entriesProcessed.Inc() lbls := c.relabel(entry) if len(lbls) == 0 { @@ -121,7 +121,7 @@ func (c *Component) Run(ctx context.Context) error { select { case <-ctx.Done(): return nil - case f <- entry: + case f.Chan() <- entry: } } } diff --git a/component/loki/relabel/relabel_test.go b/component/loki/relabel/relabel_test.go index a50519694f00..8aa9ba35d1ec 100644 --- a/component/loki/relabel/relabel_test.go +++ b/component/loki/relabel/relabel_test.go @@ -51,7 +51,7 @@ func TestRelabeling(t *testing.T) { err := river.Unmarshal([]byte(rc), &relabelConfigs) require.NoError(t, err) - ch1, ch2 := make(loki.LogsReceiver), make(loki.LogsReceiver) + ch1, ch2 := loki.NewLogsReceiver(), loki.NewLogsReceiver() // Create and run the component, so that it relabels and forwards logs. opts := component.Options{ @@ -78,7 +78,7 @@ func TestRelabeling(t *testing.T) { }, } - c.receiver <- logEntry + c.receiver.Chan() <- logEntry wantLabelSet := model.LabelSet{ "filename": "/var/log/pods/agent/agent/1.log", @@ -92,11 +92,11 @@ func TestRelabeling(t *testing.T) { // rules correctly applied. for i := 0; i < 2; i++ { select { - case logEntry := <-ch1: + case logEntry := <-ch1.Chan(): require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second) require.Equal(t, "very important log", logEntry.Line) require.Equal(t, wantLabelSet, logEntry.Labels) - case logEntry := <-ch2: + case logEntry := <-ch2.Chan(): require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second) require.Equal(t, "very important log", logEntry.Line) require.Equal(t, wantLabelSet, logEntry.Labels) @@ -112,7 +112,7 @@ func BenchmarkRelabelComponent(b *testing.B) { } var relabelConfigs cfg _ = river.Unmarshal([]byte(rc), &relabelConfigs) - ch1 := make(loki.LogsReceiver) + ch1 := loki.NewLogsReceiver() // Create and run the component, so that it relabels and forwards logs. opts := component.Options{ @@ -132,14 +132,14 @@ func BenchmarkRelabelComponent(b *testing.B) { var entry loki.Entry go func() { - for e := range ch1 { + for e := range ch1.Chan() { entry = e } }() now := time.Now() for i := 0; i < b.N; i++ { - c.receiver <- loki.Entry{ + c.receiver.Chan() <- loki.Entry{ Labels: model.LabelSet{"filename": "/var/log/pods/agent/agent/%d.log", "kubernetes_namespace": "dev", "kubernetes_pod_name": model.LabelValue(fmt.Sprintf("agent-%d", i)), "foo": "bar"}, Entry: logproto.Entry{ Timestamp: now, @@ -160,7 +160,7 @@ func TestCache(t *testing.T) { err := river.Unmarshal([]byte(rc), &relabelConfigs) require.NoError(t, err) - ch1 := make(loki.LogsReceiver) + ch1 := loki.NewLogsReceiver() // Create and run the component, so that it relabels and forwards logs. opts := component.Options{ @@ -187,7 +187,7 @@ func TestCache(t *testing.T) { go c.Run(context.Background()) go func() { - for e := range ch1 { + for e := range ch1.Chan() { require.Equal(t, "very important log", e.Line) } }() @@ -210,11 +210,11 @@ func TestCache(t *testing.T) { } // Send three entries with different label sets along the receiver. e.Labels = lsets[0] - c.receiver <- e + c.receiver.Chan() <- e e.Labels = lsets[1] - c.receiver <- e + c.receiver.Chan() <- e e.Labels = lsets[2] - c.receiver <- e + c.receiver.Chan() <- e time.Sleep(100 * time.Millisecond) // Let's look into the cache's structure now! @@ -236,7 +236,7 @@ func TestCache(t *testing.T) { // We should've hit the cached path, with no changes to the cache's length // or the underlying stored value. e.Labels = lsets[0] - c.receiver <- e + c.receiver.Chan() <- e require.Equal(t, c.cache.Len(), 3) val, _ := c.cache.Get(lsets[0].Fingerprint()) cachedVal := val.([]cacheItem) @@ -253,10 +253,10 @@ func TestCache(t *testing.T) { require.Equal(t, ls1.Fingerprint(), ls2.Fingerprint(), "expected labelset fingerprints to collide; have we changed the hashing algorithm?") e.Labels = ls1 - c.receiver <- e + c.receiver.Chan() <- e e.Labels = ls2 - c.receiver <- e + c.receiver.Chan() <- e time.Sleep(100 * time.Millisecond) // Both of these should be under a single, new cache key which will contain @@ -275,9 +275,9 @@ func TestCache(t *testing.T) { // Finally, send two more entries, which should fill up the cache and evict // the Least Recently Used items (lsets[1], and lsets[2]). e.Labels = lsets[3] - c.receiver <- e + c.receiver.Chan() <- e e.Labels = lsets[4] - c.receiver <- e + c.receiver.Chan() <- e require.Equal(t, c.cache.Len(), 4) wantKeys := []model.Fingerprint{lsets[0].Fingerprint(), ls1.Fingerprint(), lsets[3].Fingerprint(), lsets[4].Fingerprint()} @@ -307,7 +307,7 @@ rule { } ` - ch1, ch2 := make(loki.LogsReceiver), make(loki.LogsReceiver) + ch1, ch2 := loki.NewLogsReceiver(), loki.NewLogsReceiver() var args1, args2 Arguments require.NoError(t, river.Unmarshal([]byte(stg1), &args1)) require.NoError(t, river.Unmarshal([]byte(stg2), &args2)) @@ -364,11 +364,11 @@ rule { // The two entries have been modified without a race condition. for i := 0; i < 2; i++ { select { - case logEntry := <-ch1: + case logEntry := <-ch1.Chan(): require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second) require.Equal(t, "writing some text", logEntry.Line) require.Equal(t, wantLabelSet.Clone().Merge(model.LabelSet{"lbl": "foo"}), logEntry.Labels) - case logEntry := <-ch2: + case logEntry := <-ch2.Chan(): require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second) require.Equal(t, "writing some text", logEntry.Line) require.Equal(t, wantLabelSet.Clone().Merge(model.LabelSet{"lbl": "bar"}), logEntry.Labels) diff --git a/component/loki/source/api/api.go b/component/loki/source/api/api.go index c1a93c7472ea..29eef02c0712 100644 --- a/component/loki/source/api/api.go +++ b/component/loki/source/api/api.go @@ -90,7 +90,7 @@ func (c *Component) Run(ctx context.Context) (err error) { for _, receiver := range receivers { select { - case receiver <- entry: + case receiver.Chan() <- entry: case <-ctx.Done(): return } diff --git a/component/loki/source/api/api_test.go b/component/loki/source/api/api_test.go index 769eb13dda70..0632ef4f5468 100644 --- a/component/loki/source/api/api_test.go +++ b/component/loki/source/api/api_test.go @@ -415,7 +415,7 @@ func testArgsWithPorts(httpPort int, grpcPort int) Arguments { ListenPort: grpcPort, }, }, - ForwardTo: []loki.LogsReceiver{make(chan loki.Entry), make(chan loki.Entry)}, + ForwardTo: []loki.LogsReceiver{loki.NewLogsReceiver(), loki.NewLogsReceiver()}, Labels: map[string]string{"foo": "bar", "fizz": "buzz"}, RelabelRules: relabel.Rules{ { diff --git a/component/loki/source/aws_firehose/component.go b/component/loki/source/aws_firehose/component.go index 8d14301f0f33..639f8e6c7c16 100644 --- a/component/loki/source/aws_firehose/component.go +++ b/component/loki/source/aws_firehose/component.go @@ -69,7 +69,7 @@ type Component struct { func New(o component.Options, args Arguments) (*Component, error) { c := &Component{ opts: o, - destination: make(loki.LogsReceiver), + destination: loki.NewLogsReceiver(), fanout: args.ForwardTo, serverMetrics: util.NewUncheckedCollector(nil), handlerMetrics: internal.NewMetrics(o.Registerer), @@ -98,10 +98,10 @@ func (c *Component) Run(ctx context.Context) error { select { case <-ctx.Done(): return nil - case entry := <-c.destination: + case entry := <-c.destination.Chan(): c.mut.RLock() for _, receiver := range c.fanout { - receiver <- entry + receiver.Chan() <- entry } c.mut.RUnlock() } @@ -171,7 +171,7 @@ func (c *Component) Update(args component.Arguments) error { // Send implements internal.Sender so that the component is able to receive logs decoded by the handler. func (c *Component) Send(ctx context.Context, entry loki.Entry) { - c.destination <- entry + c.destination.Chan() <- entry } // shutdownServer will shut down the currently used server. diff --git a/component/loki/source/aws_firehose/component_test.go b/component/loki/source/aws_firehose/component_test.go index f8e752be037a..7d5517cd76c8 100644 --- a/component/loki/source/aws_firehose/component_test.go +++ b/component/loki/source/aws_firehose/component_test.go @@ -64,8 +64,8 @@ func TestComponent(t *testing.T) { OnStateChange: func(e component.Exports) {}, } - ch1, ch2 := make(chan loki.Entry), make(chan loki.Entry) - r1, r2 := newReceiver(ch1), newReceiver(ch2) + ch1, ch2 := loki.NewLogsReceiver(), loki.NewLogsReceiver() + r1, r2 := newReceiver(ch1.Chan()), newReceiver(ch2.Chan()) // call cancelReceivers to terminate them receiverContext, cancelReceivers := context.WithCancel(context.Background()) @@ -142,8 +142,8 @@ func TestComponent_UpdateWithNewArguments(t *testing.T) { OnStateChange: func(e component.Exports) {}, } - ch1, ch2 := make(chan loki.Entry), make(chan loki.Entry) - r1, r2 := newReceiver(ch1), newReceiver(ch2) + ch1, ch2 := loki.NewLogsReceiver(), loki.NewLogsReceiver() + r1, r2 := newReceiver(ch1.Chan()), newReceiver(ch2.Chan()) // call cancelReceivers to terminate them receiverContext, cancelReceivers := context.WithCancel(context.Background()) diff --git a/component/loki/source/azure_event_hubs/azure_event_hubs.go b/component/loki/source/azure_event_hubs/azure_event_hubs.go index 6185c228c5eb..a92d586f8f07 100644 --- a/component/loki/source/azure_event_hubs/azure_event_hubs.go +++ b/component/loki/source/azure_event_hubs/azure_event_hubs.go @@ -76,7 +76,7 @@ func New(o component.Options, args Arguments) (*Component, error) { c := &Component{ mut: sync.RWMutex{}, opts: o, - handler: make(loki.LogsReceiver), + handler: loki.NewLogsReceiver(), fanout: args.ForwardTo, } @@ -113,10 +113,10 @@ func (c *Component) Run(ctx context.Context) error { select { case <-ctx.Done(): return nil - case entry := <-c.handler: + case entry := <-c.handler.Chan(): c.mut.RLock() for _, receiver := range c.fanout { - receiver <- entry + receiver.Chan() <- entry } c.mut.RUnlock() } @@ -141,7 +141,7 @@ func (c *Component) Update(args component.Arguments) error { return err } - entryHandler := loki.NewEntryHandler(c.handler, func() {}) + entryHandler := loki.NewEntryHandler(c.handler.Chan(), func() {}) t, err := kt.NewSyncer(c.opts.Logger, cfg, entryHandler, &parser.AzureEventHubsTargetMessageParser{ DisallowCustomMessages: newArgs.DisallowCustomMessages, }) diff --git a/component/loki/source/cloudflare/cloudflare.go b/component/loki/source/cloudflare/cloudflare.go index ece2f397f931..2a1cddf9f97d 100644 --- a/component/loki/source/cloudflare/cloudflare.go +++ b/component/loki/source/cloudflare/cloudflare.go @@ -112,7 +112,7 @@ func New(o component.Options, args Arguments) (*Component, error) { c := &Component{ opts: o, metrics: cft.NewMetrics(o.Registerer), - handler: make(loki.LogsReceiver), + handler: loki.NewLogsReceiver(), fanout: args.ForwardTo, posFile: positionsFile, } @@ -138,10 +138,10 @@ func (c *Component) Run(ctx context.Context) error { select { case <-ctx.Done(): return nil - case entry := <-c.handler: + case entry := <-c.handler.Chan(): c.mut.RLock() for _, receiver := range c.fanout { - receiver <- entry + receiver.Chan() <- entry } c.mut.RUnlock() } @@ -159,7 +159,7 @@ func (c *Component) Update(args component.Arguments) error { if c.target != nil { c.target.Stop() } - entryHandler := loki.NewEntryHandler(c.handler, func() {}) + entryHandler := loki.NewEntryHandler(c.handler.Chan(), func() {}) t, err := cft.NewTarget(c.metrics, c.opts.Logger, entryHandler, c.posFile, newArgs.Convert()) if err != nil { diff --git a/component/loki/source/docker/docker.go b/component/loki/source/docker/docker.go index 770ce71794b0..c30798196d0b 100644 --- a/component/loki/source/docker/docker.go +++ b/component/loki/source/docker/docker.go @@ -92,7 +92,7 @@ func New(o component.Options, args Arguments) (*Component, error) { opts: o, metrics: dt.NewMetrics(o.Registerer), - handler: make(loki.LogsReceiver), + handler: loki.NewLogsReceiver(), manager: newManager(o.Logger, nil), receivers: args.ForwardTo, posFile: positionsFile, @@ -125,12 +125,12 @@ func (c *Component) Run(ctx context.Context) error { select { case <-ctx.Done(): return nil - case entry := <-c.handler: + case entry := <-c.handler.Chan(): c.receiversMut.RLock() receivers := c.receivers c.receiversMut.RUnlock() for _, receiver := range receivers { - receiver <- entry + receiver.Chan() <- entry } } } @@ -232,7 +232,7 @@ func (c *Component) getManagerOptions(args Arguments) (*options, error) { return &options{ client: client, - handler: loki.NewEntryHandler(c.handler, func() {}), + handler: loki.NewEntryHandler(c.handler.Chan(), func() {}), positions: c.posFile, }, nil } diff --git a/component/loki/source/file/file.go b/component/loki/source/file/file.go index a67a4b899cde..50b9a5e260df 100644 --- a/component/loki/source/file/file.go +++ b/component/loki/source/file/file.go @@ -80,7 +80,7 @@ func New(o component.Options, args Arguments) (*Component, error) { opts: o, metrics: newMetrics(o.Registerer), - handler: make(loki.LogsReceiver), + handler: loki.NewLogsReceiver(), receivers: args.ForwardTo, posFile: positionsFile, readers: make(map[positions.Entry]reader), @@ -106,7 +106,7 @@ func (c *Component) Run(ctx context.Context) error { r.Stop() } c.posFile.Stop() - close(c.handler) + close(c.handler.Chan()) c.mut.RUnlock() }() @@ -114,10 +114,10 @@ func (c *Component) Run(ctx context.Context) error { select { case <-ctx.Done(): return nil - case entry := <-c.handler: + case entry := <-c.handler.Chan(): c.mut.RLock() for _, receiver := range c.receivers { - receiver <- entry + receiver.Chan() <- entry } c.mut.RUnlock() } @@ -184,7 +184,7 @@ func (c *Component) Update(args component.Arguments) error { c.reportSize(path, labels.String()) - handler := loki.AddLabelsMiddleware(labels).Wrap(loki.NewEntryHandler(c.handler, func() {})) + handler := loki.AddLabelsMiddleware(labels).Wrap(loki.NewEntryHandler(c.handler.Chan(), func() {})) reader, err := c.startTailing(path, labels, handler) if err != nil { continue diff --git a/component/loki/source/file/file_test.go b/component/loki/source/file/file_test.go index 94b1eb70e31a..0d7de903906a 100644 --- a/component/loki/source/file/file_test.go +++ b/component/loki/source/file/file_test.go @@ -36,7 +36,7 @@ func Test(t *testing.T) { ctrl, err := componenttest.NewControllerFromID(util.TestLogger(t), "loki.source.file") require.NoError(t, err) - ch1, ch2 := make(chan loki.Entry), make(chan loki.Entry) + ch1, ch2 := loki.NewLogsReceiver(), loki.NewLogsReceiver() go func() { err := ctrl.Run(ctx, Arguments{ @@ -61,11 +61,11 @@ func Test(t *testing.T) { for i := 0; i < 2; i++ { select { - case logEntry := <-ch1: + case logEntry := <-ch1.Chan(): require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second) require.Equal(t, "writing some text", logEntry.Line) require.Equal(t, wantLabelSet, logEntry.Labels) - case logEntry := <-ch2: + case logEntry := <-ch2.Chan(): require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second) require.Equal(t, "writing some text", logEntry.Line) require.Equal(t, wantLabelSet, logEntry.Labels) @@ -133,7 +133,7 @@ func TestTwoTargets(t *testing.T) { defer f.Close() defer f2.Close() - ch1 := make(chan loki.Entry) + ch1 := loki.NewLogsReceiver() args := Arguments{} args.Targets = []discovery.Target{ {"__path__": f.Name(), "foo": "bar"}, @@ -157,7 +157,7 @@ func TestTwoTargets(t *testing.T) { foundF1, foundF2 := false, false for i := 0; i < 2; i++ { select { - case logEntry := <-ch1: + case logEntry := <-ch1.Chan(): require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second) if logEntry.Line == "text" { foundF1 = true diff --git a/component/loki/source/gcplog/gcplog.go b/component/loki/source/gcplog/gcplog.go index 3b2872c42caf..df01ab8d84b0 100644 --- a/component/loki/source/gcplog/gcplog.go +++ b/component/loki/source/gcplog/gcplog.go @@ -71,7 +71,7 @@ func New(o component.Options, args Arguments) (*Component, error) { c := &Component{ opts: o, metrics: gt.NewMetrics(o.Registerer), - handler: make(loki.LogsReceiver), + handler: loki.NewLogsReceiver(), fanout: args.ForwardTo, serverMetrics: util.NewUncheckedCollector(nil), } @@ -102,10 +102,10 @@ func (c *Component) Run(ctx context.Context) error { select { case <-ctx.Done(): return nil - case entry := <-c.handler: + case entry := <-c.handler.Chan(): c.mut.RLock() for _, receiver := range c.fanout { - receiver <- entry + receiver.Chan() <- entry } c.mut.RUnlock() } @@ -131,7 +131,7 @@ func (c *Component) Update(args component.Arguments) error { level.Error(c.opts.Logger).Log("msg", "error while stopping gcplog target", "err", err) } } - entryHandler := loki.NewEntryHandler(c.handler, func() {}) + entryHandler := loki.NewEntryHandler(c.handler.Chan(), func() {}) jobName := strings.Replace(c.opts.ID, ".", "_", -1) if newArgs.PullTarget != nil { diff --git a/component/loki/source/gcplog/gcplog_test.go b/component/loki/source/gcplog/gcplog_test.go index f3dee5c39ce6..cc3f922c4059 100644 --- a/component/loki/source/gcplog/gcplog_test.go +++ b/component/loki/source/gcplog/gcplog_test.go @@ -33,7 +33,7 @@ func TestPush(t *testing.T) { OnStateChange: func(e component.Exports) {}, } - ch1, ch2 := make(chan loki.Entry), make(chan loki.Entry) + ch1, ch2 := loki.NewLogsReceiver(), loki.NewLogsReceiver() args := Arguments{} port, err := freeport.GetFreePort() @@ -75,11 +75,11 @@ func TestPush(t *testing.T) { for i := 0; i < 2; i++ { select { - case logEntry := <-ch1: + case logEntry := <-ch1.Chan(): require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second) require.Equal(t, wantLogLine, logEntry.Line) require.Equal(t, wantLabelSet, logEntry.Labels) - case logEntry := <-ch2: + case logEntry := <-ch2.Chan(): require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second) require.Equal(t, wantLogLine, logEntry.Line) require.Equal(t, wantLabelSet, logEntry.Labels) diff --git a/component/loki/source/gelf/gelf.go b/component/loki/source/gelf/gelf.go index 4417c6ec81af..109ec09c52b1 100644 --- a/component/loki/source/gelf/gelf.go +++ b/component/loki/source/gelf/gelf.go @@ -55,7 +55,7 @@ func (c *Component) Run(ctx context.Context) error { lokiEntry.Labels["job"] = model.LabelValue(c.o.ID) } for _, r := range c.receivers { - r <- lokiEntry + r.Chan() <- lokiEntry } c.mut.RUnlock() } diff --git a/component/loki/source/gelf/gelf_test.go b/component/loki/source/gelf/gelf_test.go index 247594a26ae7..b9a64dd03be5 100644 --- a/component/loki/source/gelf/gelf_test.go +++ b/component/loki/source/gelf/gelf_test.go @@ -24,7 +24,7 @@ func TestGelf(t *testing.T) { } testMsg := `{"version":"1.1","host":"example.org","short_message":"A short message","timestamp":1231231123,"level":5,"_some_extra":"extra"}` - ch1 := make(chan loki.Entry) + ch1 := loki.NewLogsReceiver() udpListenerAddr := getFreeAddr(t) args := Arguments{ @@ -46,7 +46,7 @@ func TestGelf(t *testing.T) { case <-ctx.Done(): // If this is called then it failed. require.True(t, false) - case e := <-ch1: + case e := <-ch1.Chan(): require.True(t, strings.Contains(e.Entry.Line, "A short message")) found = true } diff --git a/component/loki/source/heroku/heroku.go b/component/loki/source/heroku/heroku.go index 35be609f0b77..8bbbf21c0fda 100644 --- a/component/loki/source/heroku/heroku.go +++ b/component/loki/source/heroku/heroku.go @@ -68,7 +68,7 @@ func New(o component.Options, args Arguments) (*Component, error) { args: Arguments{}, fanout: args.ForwardTo, target: nil, - handler: make(loki.LogsReceiver), + handler: loki.NewLogsReceiver(), serverMetrics: util.NewUncheckedCollector(nil), } @@ -101,10 +101,10 @@ func (c *Component) Run(ctx context.Context) error { select { case <-ctx.Done(): return nil - case entry := <-c.handler: + case entry := <-c.handler.Chan(): c.mut.RLock() for _, receiver := range c.fanout { - receiver <- entry + receiver.Chan() <- entry } c.mut.RUnlock() } @@ -143,7 +143,7 @@ func (c *Component) Update(args component.Arguments) error { registry := prometheus.NewRegistry() c.serverMetrics.SetCollector(registry) - entryHandler := loki.NewEntryHandler(c.handler, func() {}) + entryHandler := loki.NewEntryHandler(c.handler.Chan(), func() {}) t, err := ht.NewHerokuTarget(c.metrics, c.opts.Logger, entryHandler, rcs, newArgs.Convert(), registry) if err != nil { level.Error(c.opts.Logger).Log("msg", "failed to create heroku listener with provided config", "err", err) diff --git a/component/loki/source/heroku/heroku_test.go b/component/loki/source/heroku/heroku_test.go index 4027ce71d3db..886f05d9ac7e 100644 --- a/component/loki/source/heroku/heroku_test.go +++ b/component/loki/source/heroku/heroku_test.go @@ -24,7 +24,7 @@ import ( func TestPush(t *testing.T) { opts := defaultOptions(t) - ch1, ch2 := make(chan loki.Entry), make(chan loki.Entry) + ch1, ch2 := loki.NewLogsReceiver(), loki.NewLogsReceiver() args := testArgsWith(t, func(args *Arguments) { args.ForwardTo = []loki.LogsReceiver{ch1, ch2} args.RelabelRules = rulesExport @@ -51,11 +51,11 @@ func TestPush(t *testing.T) { for i := 0; i < 2; i++ { select { - case logEntry := <-ch1: + case logEntry := <-ch1.Chan(): require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second) require.Equal(t, wantLogLine, logEntry.Line) require.Equal(t, wantLabelSet, logEntry.Labels) - case logEntry := <-ch2: + case logEntry := <-ch2.Chan(): require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second) require.Equal(t, wantLogLine, logEntry.Line) require.Equal(t, wantLabelSet, logEntry.Labels) @@ -217,7 +217,7 @@ func testArgsWithPorts(httpPort int, grpcPort int) Arguments { ListenPort: grpcPort, }, }, - ForwardTo: []loki.LogsReceiver{make(chan loki.Entry), make(chan loki.Entry)}, + ForwardTo: []loki.LogsReceiver{loki.NewLogsReceiver(), loki.NewLogsReceiver()}, Labels: map[string]string{"foo": "bar", "fizz": "buzz"}, RelabelRules: flow_relabel.Rules{ { diff --git a/component/loki/source/journal/journal.go b/component/loki/source/journal/journal.go index 7839415a1b3e..f6888198f90f 100644 --- a/component/loki/source/journal/journal.go +++ b/component/loki/source/journal/journal.go @@ -92,7 +92,7 @@ func (c *Component) Run(ctx context.Context) error { Entry: entry.Entry, } for _, r := range c.receivers { - r <- lokiEntry + r.Chan() <- lokiEntry } c.mut.RUnlock() } diff --git a/component/loki/source/journal/journal_test.go b/component/loki/source/journal/journal_test.go index 86ecc312a454..9a53341167ad 100644 --- a/component/loki/source/journal/journal_test.go +++ b/component/loki/source/journal/journal_test.go @@ -19,7 +19,7 @@ import ( func TestJournal(t *testing.T) { // Create opts for component tmp := t.TempDir() - lr := make(loki.LogsReceiver) + lr := loki.NewLogsReceiver() c, err := New(component.Options{ ID: "loki.source.journal.test", Logger: util.TestFlowLogger(t), @@ -46,7 +46,7 @@ func TestJournal(t *testing.T) { found = true // Timed out getting message require.True(t, false) - case msg := <-lr: + case msg := <-lr.Chan(): if strings.Contains(msg.Line, ts) { found = true break diff --git a/component/loki/source/kafka/kafka.go b/component/loki/source/kafka/kafka.go index a90c871a05f1..0bc09f44f51d 100644 --- a/component/loki/source/kafka/kafka.go +++ b/component/loki/source/kafka/kafka.go @@ -102,7 +102,7 @@ func New(o component.Options, args Arguments) (*Component, error) { mut: sync.RWMutex{}, fanout: args.ForwardTo, target: nil, - handler: make(loki.LogsReceiver), + handler: loki.NewLogsReceiver(), } // Call to Update() to start readers and set receivers once at the start. @@ -132,10 +132,10 @@ func (c *Component) Run(ctx context.Context) error { select { case <-ctx.Done(): return nil - case entry := <-c.handler: + case entry := <-c.handler.Chan(): c.mut.RLock() for _, receiver := range c.fanout { - receiver <- entry + receiver.Chan() <- entry } c.mut.RUnlock() } @@ -157,7 +157,7 @@ func (c *Component) Update(args component.Arguments) error { } } - entryHandler := loki.NewEntryHandler(c.handler, func() {}) + entryHandler := loki.NewEntryHandler(c.handler.Chan(), func() {}) t, err := kt.NewSyncer(c.opts.Logger, newArgs.Convert(), entryHandler, &kt.KafkaTargetMessageParser{}) if err != nil { level.Error(c.opts.Logger).Log("msg", "failed to create kafka client with provided config", "err", err) diff --git a/component/loki/source/kubernetes/kubernetes.go b/component/loki/source/kubernetes/kubernetes.go index 7165a68ad95b..a6463ffb91c8 100644 --- a/component/loki/source/kubernetes/kubernetes.go +++ b/component/loki/source/kubernetes/kubernetes.go @@ -95,7 +95,7 @@ func New(o component.Options, args Arguments) (*Component, error) { c := &Component{ log: o.Logger, opts: o, - handler: make(loki.LogsReceiver), + handler: loki.NewLogsReceiver(), positions: positionsFile, } if err := c.Update(args); err != nil { @@ -123,13 +123,13 @@ func (c *Component) Run(ctx context.Context) error { select { case <-ctx.Done(): return nil - case entry := <-c.handler: + case entry := <-c.handler.Chan(): c.receiversMut.RLock() receivers := c.receivers c.receiversMut.RUnlock() for _, receiver := range receivers { - receiver <- entry + receiver.Chan() <- entry } } } @@ -216,7 +216,7 @@ func (c *Component) getTailerOptions(args Arguments) (*kubetail.Options, error) return &kubetail.Options{ Client: clientSet, - Handler: loki.NewEntryHandler(c.handler, func() {}), + Handler: loki.NewEntryHandler(c.handler.Chan(), func() {}), Positions: c.positions, }, nil } diff --git a/component/loki/source/kubernetes_events/event_controller.go b/component/loki/source/kubernetes_events/event_controller.go index 159ce5804bcc..f76d696afd8c 100644 --- a/component/loki/source/kubernetes_events/event_controller.go +++ b/component/loki/source/kubernetes_events/event_controller.go @@ -66,7 +66,7 @@ func newEventController(task eventControllerTask) *eventController { return &eventController{ log: task.Log, task: task, - handler: loki.NewEntryHandler(task.Receiver, func() {}), + handler: loki.NewEntryHandler(task.Receiver.Chan(), func() {}), positionsKey: key, initTimestamp: time.UnixMicro(lastTimestamp), } diff --git a/component/loki/source/kubernetes_events/kubernetes_events.go b/component/loki/source/kubernetes_events/kubernetes_events.go index 2f275aa9b1bd..a5bacc6af26f 100644 --- a/component/loki/source/kubernetes_events/kubernetes_events.go +++ b/component/loki/source/kubernetes_events/kubernetes_events.go @@ -116,7 +116,7 @@ func New(o component.Options, args Arguments) (*Component, error) { log: o.Logger, opts: o, positions: positionsFile, - handler: make(loki.LogsReceiver), + handler: loki.NewLogsReceiver(), runner: runner.New(func(t eventControllerTask) runner.Worker { return newEventController(t) }), @@ -164,13 +164,13 @@ func (c *Component) Run(ctx context.Context) error { select { case <-ctx.Done(): return nil - case entry := <-c.handler: + case entry := <-c.handler.Chan(): c.receiversMut.RLock() receivers := c.receivers c.receiversMut.RUnlock() for _, receiver := range receivers { - receiver <- entry + receiver.Chan() <- entry } } } diff --git a/component/loki/source/podlogs/podlogs.go b/component/loki/source/podlogs/podlogs.go index 408a0c8e4feb..2b14d0f4d2f7 100644 --- a/component/loki/source/podlogs/podlogs.go +++ b/component/loki/source/podlogs/podlogs.go @@ -113,7 +113,7 @@ func New(o component.Options, args Arguments) (*Component, error) { controller: controller, positions: positionsFile, - handler: make(loki.LogsReceiver), + handler: loki.NewLogsReceiver(), } if err := c.Update(args); err != nil { return nil, err @@ -166,13 +166,13 @@ func (c *Component) runHandler(ctx context.Context) { select { case <-ctx.Done(): return - case entry := <-c.handler: + case entry := <-c.handler.Chan(): c.receiversMut.RLock() receivers := c.receivers c.receiversMut.RUnlock() for _, receiver := range receivers { - receiver <- entry + receiver.Chan() <- entry } } } @@ -221,7 +221,7 @@ func (c *Component) updateTailer(args Arguments) error { managerOpts := &kubetail.Options{ Client: clientSet, - Handler: loki.NewEntryHandler(c.handler, func() {}), + Handler: loki.NewEntryHandler(c.handler.Chan(), func() {}), Positions: c.positions, } c.lastOptions = managerOpts diff --git a/component/loki/source/syslog/syslog.go b/component/loki/source/syslog/syslog.go index 7c0794c7d82c..26c610121df4 100644 --- a/component/loki/source/syslog/syslog.go +++ b/component/loki/source/syslog/syslog.go @@ -50,7 +50,7 @@ func New(o component.Options, args Arguments) (*Component, error) { c := &Component{ opts: o, metrics: st.NewMetrics(o.Registerer), - handler: make(loki.LogsReceiver), + handler: loki.NewLogsReceiver(), fanout: args.ForwardTo, targets: []*st.SyslogTarget{}, @@ -80,10 +80,10 @@ func (c *Component) Run(ctx context.Context) error { select { case <-ctx.Done(): return nil - case entry := <-c.handler: + case entry := <-c.handler.Chan(): c.mut.RLock() for _, receiver := range c.fanout { - receiver <- entry + receiver.Chan() <- entry } c.mut.RUnlock() } @@ -111,7 +111,7 @@ func (c *Component) Update(args component.Arguments) error { } } c.targets = make([]*st.SyslogTarget, 0) - entryHandler := loki.NewEntryHandler(c.handler, func() {}) + entryHandler := loki.NewEntryHandler(c.handler.Chan(), func() {}) for _, cfg := range newArgs.SyslogListeners { t, err := st.NewSyslogTarget(c.metrics, c.opts.Logger, entryHandler, rcs, cfg.Convert()) diff --git a/component/loki/source/syslog/syslog_test.go b/component/loki/source/syslog/syslog_test.go index 9d62715317e6..beec490dad01 100644 --- a/component/loki/source/syslog/syslog_test.go +++ b/component/loki/source/syslog/syslog_test.go @@ -26,7 +26,7 @@ func Test(t *testing.T) { OnStateChange: func(e component.Exports) {}, } - ch1, ch2 := make(chan loki.Entry), make(chan loki.Entry) + ch1, ch2 := loki.NewLogsReceiver(), loki.NewLogsReceiver() args := Arguments{} tcpListenerAddr, udpListenerAddr := getFreeAddr(t), getFreeAddr(t) @@ -63,11 +63,11 @@ func Test(t *testing.T) { for i := 0; i < 2; i++ { select { - case logEntry := <-ch1: + case logEntry := <-ch1.Chan(): require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second) require.Equal(t, "An application event log entry...", logEntry.Line) require.Equal(t, wantLabelSet, logEntry.Labels) - case logEntry := <-ch2: + case logEntry := <-ch2.Chan(): require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second) require.Equal(t, "An application event log entry...", logEntry.Line) require.Equal(t, wantLabelSet, logEntry.Labels) @@ -87,11 +87,11 @@ func Test(t *testing.T) { for i := 0; i < 2; i++ { select { - case logEntry := <-ch1: + case logEntry := <-ch1.Chan(): require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second) require.Equal(t, "An application event log entry...", logEntry.Line) require.Equal(t, wantLabelSet, logEntry.Labels) - case logEntry := <-ch2: + case logEntry := <-ch2.Chan(): require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second) require.Equal(t, "An application event log entry...", logEntry.Line) require.Equal(t, wantLabelSet, logEntry.Labels) @@ -108,7 +108,7 @@ func TestWithRelabelRules(t *testing.T) { OnStateChange: func(e component.Exports) {}, } - ch1 := make(chan loki.Entry) + ch1 := loki.NewLogsReceiver() args := Arguments{} tcpListenerAddr := getFreeAddr(t) @@ -161,7 +161,7 @@ func TestWithRelabelRules(t *testing.T) { } select { - case logEntry := <-ch1: + case logEntry := <-ch1.Chan(): require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second) require.Equal(t, "An application event log entry...", logEntry.Line) require.Equal(t, wantLabelSet, logEntry.Labels) diff --git a/component/loki/source/windowsevent/component_test.go b/component/loki/source/windowsevent/component_test.go index a6b2dbc71de2..514e70973c73 100644 --- a/component/loki/source/windowsevent/component_test.go +++ b/component/loki/source/windowsevent/component_test.go @@ -23,7 +23,7 @@ func TestEventLogger(t *testing.T) { wlog, err := eventlog.Open(loggerName) require.NoError(t, err) dataPath := t.TempDir() - rec := make(loki.LogsReceiver) + rec := loki.NewLogsReceiver() c, err := New(component.Options{ ID: "loki.source.windowsevent.test", Logger: util.TestFlowLogger(t), diff --git a/component/loki/source/windowsevent/component_windows.go b/component/loki/source/windowsevent/component_windows.go index f24748adab69..51b26d485d68 100644 --- a/component/loki/source/windowsevent/component_windows.go +++ b/component/loki/source/windowsevent/component_windows.go @@ -88,7 +88,7 @@ func (c *Component) Run(ctx context.Context) error { Entry: entry.Entry, } for _, receiver := range c.receivers { - receiver <- lokiEntry + receiver.Chan() <- lokiEntry } c.mut.RUnlock() } diff --git a/component/loki/write/write.go b/component/loki/write/write.go index 45e6365409d0..79ea12bfc35a 100644 --- a/component/loki/write/write.go +++ b/component/loki/write/write.go @@ -64,7 +64,7 @@ func New(o component.Options, args Arguments) (*Component, error) { // Create and immediately export the receiver which remains the same for // the component's lifetime. - c.receiver = make(loki.LogsReceiver) + c.receiver = loki.NewLogsReceiver() o.OnStateChange(Exports{Receiver: c.receiver}) // Call to Update() to start readers and set receivers once at the start. @@ -81,7 +81,7 @@ func (c *Component) Run(ctx context.Context) error { select { case <-ctx.Done(): return nil - case entry := <-c.receiver: + case entry := <-c.receiver.Chan(): for _, client := range c.clients { if client != nil { select { diff --git a/component/loki/write/write_test.go b/component/loki/write/write_test.go index d61fdc294786..76f941b6e9be 100644 --- a/component/loki/write/write_test.go +++ b/component/loki/write/write_test.go @@ -100,8 +100,8 @@ func Test(t *testing.T) { } exports := tc.Exports().(Exports) - exports.Receiver <- logEntry - exports.Receiver <- logEntry + exports.Receiver.Chan() <- logEntry + exports.Receiver.Chan() <- logEntry // Wait for our exporter to finish and pass data to our HTTP server. // Make sure the log entries were received correctly. diff --git a/component/otelcol/exporter/loki/internal/convert/convert.go b/component/otelcol/exporter/loki/internal/convert/convert.go index 81bf4968ff34..4db7b089c45b 100644 --- a/component/otelcol/exporter/loki/internal/convert/convert.go +++ b/component/otelcol/exporter/loki/internal/convert/convert.go @@ -114,7 +114,7 @@ func (conv *Converter) ConsumeLogs(ctx context.Context, ld plog.Logs) error { select { case <-ctx.Done(): return nil - case receiver <- entry: + case receiver.Chan() <- entry: // no-op, send the entry along } } diff --git a/component/otelcol/exporter/loki/internal/convert/convert_test.go b/component/otelcol/exporter/loki/internal/convert/convert_test.go index 57d4da08d253..b4c7ffcba88a 100644 --- a/component/otelcol/exporter/loki/internal/convert/convert_test.go +++ b/component/otelcol/exporter/loki/internal/convert/convert_test.go @@ -456,7 +456,7 @@ func TestConverter(t *testing.T) { require.NoError(t, err) l := util.TestLogger(t) - ch1, ch2 := make(loki.LogsReceiver), make(loki.LogsReceiver) + ch1, ch2 := loki.NewLogsReceiver(), loki.NewLogsReceiver() conv := convert.New(l, prometheus.NewRegistry(), []loki.LogsReceiver{ch1, ch2}) go func() { require.NoError(t, conv.ConsumeLogs(context.Background(), payload)) @@ -464,11 +464,11 @@ func TestConverter(t *testing.T) { for i := 0; i < 2; i++ { select { - case l := <-ch1: + case l := <-ch1.Chan(): require.Equal(t, tc.expectLine, l.Line) require.Equal(t, tc.expectLabels, l.Labels.String()) require.Equal(t, tc.expectTimestamp, l.Timestamp.UTC()) - case l := <-ch2: + case l := <-ch2.Chan(): require.Equal(t, tc.expectLine, l.Line) require.Equal(t, tc.expectLabels, l.Labels.String()) require.Equal(t, tc.expectTimestamp, l.Timestamp.UTC()) diff --git a/component/otelcol/receiver/loki/loki.go b/component/otelcol/receiver/loki/loki.go index 7ba7e96d8de9..197ed75f46fb 100644 --- a/component/otelcol/receiver/loki/loki.go +++ b/component/otelcol/receiver/loki/loki.go @@ -67,7 +67,7 @@ func New(o component.Options, c Arguments) (*Component, error) { // Create and immediately export the receiver which remains the same for // the component's lifetime. - res.receiver = make(loki.LogsReceiver) + res.receiver = loki.NewLogsReceiver() o.OnStateChange(Exports{Receiver: res.receiver}) if err := res.Update(c); err != nil { @@ -82,7 +82,7 @@ func (c *Component) Run(ctx context.Context) error { select { case <-ctx.Done(): return nil - case entry := <-c.receiver: + case entry := <-c.receiver.Chan(): stanzaEntry := parsePromtailEntry(entry) plogEntry := adapter.Convert(stanzaEntry) diff --git a/component/otelcol/receiver/loki/loki_test.go b/component/otelcol/receiver/loki/loki_test.go index 6faeb87e82a8..5127dffc5321 100644 --- a/component/otelcol/receiver/loki/loki_test.go +++ b/component/otelcol/receiver/loki/loki_test.go @@ -58,7 +58,7 @@ func Test(t *testing.T) { Line: "It's super effective!", }, } - exports.Receiver <- entry + exports.Receiver.Chan() <- entry }() wantAttributes := map[string]interface{}{