Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make loki.LogsReceiver an interface #4303

Merged
merged 2 commits into from
Jun 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions component/common/loki/client/fake/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

// Client is a fake client used for testing.
type Client struct {
entries loki.LogsReceiver
entries chan loki.Entry
received []loki.Entry
once sync.Once
mtx sync.Mutex
Expand All @@ -22,7 +22,7 @@ type Client struct {
func NewClient(stop func()) *Client {
c := &Client{
OnStop: stop,
entries: make(loki.LogsReceiver),
entries: make(chan loki.Entry),
}
c.wg.Add(1)
go func() {
Expand All @@ -49,7 +49,7 @@ func (c *Client) Chan() chan<- loki.Entry {

// LogsReceiver returns this client as a LogsReceiver, which is useful in testing.
func (c *Client) LogsReceiver() loki.LogsReceiver {
return c.entries
return loki.NewLogsReceiverWithChannel(c.entries)
}

func (c *Client) Received() []loki.Entry {
Expand Down
28 changes: 24 additions & 4 deletions component/common/loki/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,29 @@ import (
// to an outage or erroring (such as limits being hit).
const finalEntryTimeout = 5 * time.Second

// LogsReceiver is an alias for chan Entry which is used for component
// LogsReceiver is an interface providing `chan Entry` which is used for component
// communication.
type LogsReceiver chan Entry
type LogsReceiver interface {
Chan() chan Entry
}

type logsReceiver struct {
entries chan Entry
}

func (l *logsReceiver) Chan() chan Entry {
return l.entries
}

func NewLogsReceiver() LogsReceiver {
return NewLogsReceiverWithChannel(make(chan Entry))
}

func NewLogsReceiverWithChannel(c chan Entry) LogsReceiver {
return &logsReceiver{
entries: c,
}
}
Comment on lines +26 to +48
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the main change - the rest is just making it work using it.


// Entry is a log entry with labels.
type Entry struct {
Expand Down Expand Up @@ -55,7 +75,7 @@ type EntryHandler interface {
}

// EntryMiddleware takes an EntryHandler and returns another one that will intercept and forward entries.
// The newly created EntryHandler should be Stopped independently from the original one.
// The newly created EntryHandler should be Stopped independently of the original one.
type EntryMiddleware interface {
Wrap(EntryHandler) EntryHandler
}
Expand Down Expand Up @@ -133,7 +153,7 @@ func NewEntryMutatorHandler(next EntryHandler, f EntryMutatorFunc) EntryHandler

select {
case <-ctx.Done():
// The goroutine above exited on its own so we don't have to wait for
// The goroutine above exited on its own, so we don't have to wait for
// the timeout.
case <-time.After(finalEntryTimeout):
// We reached the timeout for sending the final entry to nextChan;
Expand Down
5 changes: 2 additions & 3 deletions component/loki/echo/echo.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,9 @@ type Component struct {

// New creates a new loki.echo component.
func New(o component.Options, args Arguments) (*Component, error) {
ch := make(chan loki.Entry)
c := &Component{
opts: o,
receiver: ch,
receiver: loki.NewLogsReceiver(),
}

// Call to Update() once at the start.
Expand All @@ -77,7 +76,7 @@ func (c *Component) Run(ctx context.Context) error {
select {
case <-ctx.Done():
return nil
case entry := <-c.receiver:
case entry := <-c.receiver.Chan():
level.Info(c.opts.Logger).Log("receiver", c.opts.ID, "entry", entry.Line, "labels", entry.Labels.String())
}
}
Expand Down
8 changes: 4 additions & 4 deletions component/loki/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ func New(o component.Options, args Arguments) (*Component, error) {

// Create and immediately export the receiver which remains the same for
// the component's lifetime.
c.receiver = make(loki.LogsReceiver)
c.processOut = make(loki.LogsReceiver)
c.receiver = loki.NewLogsReceiver()
c.processOut = make(chan loki.Entry)
o.OnStateChange(Exports{Receiver: c.receiver})

// Call to Update() to start readers and set receivers once at the start.
Expand Down Expand Up @@ -126,7 +126,7 @@ func (c *Component) handleIn(ctx context.Context, wg *sync.WaitGroup) {
select {
case <-ctx.Done():
return
case entry := <-c.receiver:
case entry := <-c.receiver.Chan():
c.mut.RLock()
select {
case <-ctx.Done():
Expand Down Expand Up @@ -155,7 +155,7 @@ func (c *Component) handleOut(ctx context.Context, wg *sync.WaitGroup) {
select {
case <-ctx.Done():
return
case f <- entry:
case f.Chan() <- entry:
// no-op
}
}
Expand Down
30 changes: 15 additions & 15 deletions component/loki/process/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestJSONLabelsStage(t *testing.T) {
err := river.Unmarshal([]byte(stg), &stagesCfg)
require.NoError(t, err)

ch1, ch2 := make(loki.LogsReceiver), make(loki.LogsReceiver)
ch1, ch2 := loki.NewLogsReceiver(), loki.NewLogsReceiver()

// Create and run the component, so that it can process and forwards logs.
opts := component.Options{
Expand Down Expand Up @@ -97,7 +97,7 @@ func TestJSONLabelsStage(t *testing.T) {
},
}

c.receiver <- logEntry
c.receiver.Chan() <- logEntry

wantLabelSet := model.LabelSet{
"filename": "/var/log/pods/agent/agent/1.log",
Expand All @@ -111,11 +111,11 @@ func TestJSONLabelsStage(t *testing.T) {
// stages correctly applied.
for i := 0; i < 2; i++ {
select {
case logEntry := <-ch1:
case logEntry := <-ch1.Chan():
require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)
require.Equal(t, logline, logEntry.Line)
require.Equal(t, wantLabelSet, logEntry.Labels)
case logEntry := <-ch2:
case logEntry := <-ch2.Chan():
require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)
require.Equal(t, logline, logEntry.Line)
require.Equal(t, wantLabelSet, logEntry.Labels)
Expand Down Expand Up @@ -154,7 +154,7 @@ stage.label_keep {
err := river.Unmarshal([]byte(stg), &stagesCfg)
require.NoError(t, err)

ch1, ch2 := make(loki.LogsReceiver), make(loki.LogsReceiver)
ch1, ch2 := loki.NewLogsReceiver(), loki.NewLogsReceiver()

// Create and run the component, so that it can process and forwards logs.
opts := component.Options{
Expand Down Expand Up @@ -184,7 +184,7 @@ stage.label_keep {
},
}

c.receiver <- logEntry
c.receiver.Chan() <- logEntry

wantLabelSet := model.LabelSet{
"filename": "/var/log/pods/agent/agent/1.log",
Expand All @@ -195,11 +195,11 @@ stage.label_keep {
// stages correctly applied.
for i := 0; i < 2; i++ {
select {
case logEntry := <-ch1:
case logEntry := <-ch1.Chan():
require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)
require.Equal(t, logline, logEntry.Line)
require.Equal(t, wantLabelSet, logEntry.Labels)
case logEntry := <-ch2:
case logEntry := <-ch2.Chan():
require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)
require.Equal(t, logline, logEntry.Line)
require.Equal(t, wantLabelSet, logEntry.Labels)
Expand Down Expand Up @@ -249,7 +249,7 @@ stage.labels {
err := river.Unmarshal([]byte(stg), &stagesCfg)
require.NoError(t, err)

ch1, ch2 := make(loki.LogsReceiver), make(loki.LogsReceiver)
ch1, ch2 := loki.NewLogsReceiver(), loki.NewLogsReceiver()

// Create and run the component, so that it can process and forwards logs.
opts := component.Options{
Expand Down Expand Up @@ -279,7 +279,7 @@ stage.labels {
},
}

c.receiver <- logEntry
c.receiver.Chan() <- logEntry

wantLabelSet := model.LabelSet{
"filename": "/var/log/pods/agent/agent/1.log",
Expand All @@ -294,11 +294,11 @@ stage.labels {
// stages correctly applied.
for i := 0; i < 2; i++ {
select {
case logEntry := <-ch1:
case logEntry := <-ch1.Chan():
require.Equal(t, wantLogline, logEntry.Line)
require.Equal(t, wantTimestamp, logEntry.Timestamp)
require.Equal(t, wantLabelSet, logEntry.Labels)
case logEntry := <-ch2:
case logEntry := <-ch2.Chan():
require.Equal(t, wantLogline, logEntry.Line)
require.Equal(t, wantTimestamp, logEntry.Timestamp)
require.Equal(t, wantLabelSet, logEntry.Labels)
Expand All @@ -323,7 +323,7 @@ stage.static_labels {
}
`

ch1, ch2 := make(loki.LogsReceiver), make(loki.LogsReceiver)
ch1, ch2 := loki.NewLogsReceiver(), loki.NewLogsReceiver()
var args1, args2 Arguments
require.NoError(t, river.Unmarshal([]byte(stg1), &args1))
require.NoError(t, river.Unmarshal([]byte(stg2), &args2))
Expand Down Expand Up @@ -374,11 +374,11 @@ stage.static_labels {
// race condition between them.
for i := 0; i < 2; i++ {
select {
case logEntry := <-ch1:
case logEntry := <-ch1.Chan():
require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)
require.Equal(t, "writing some text", logEntry.Line)
require.Equal(t, wantLabelSet.Clone().Merge(model.LabelSet{"lbl": "foo"}), logEntry.Labels)
case logEntry := <-ch2:
case logEntry := <-ch2.Chan():
require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)
require.Equal(t, "writing some text", logEntry.Line)
require.Equal(t, wantLabelSet.Clone().Merge(model.LabelSet{"lbl": "bar"}), logEntry.Labels)
Expand Down
6 changes: 3 additions & 3 deletions component/loki/relabel/relabel.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func New(o component.Options, args Arguments) (*Component, error) {

// Create and immediately export the receiver which remains the same for
// the component's lifetime.
c.receiver = make(loki.LogsReceiver)
c.receiver = loki.NewLogsReceiver()
o.OnStateChange(Exports{Receiver: c.receiver, Rules: args.RelabelConfigs})

// Call to Update() to set the relabelling rules once at the start.
Expand All @@ -107,7 +107,7 @@ func (c *Component) Run(ctx context.Context) error {
select {
case <-ctx.Done():
return nil
case entry := <-c.receiver:
case entry := <-c.receiver.Chan():
c.metrics.entriesProcessed.Inc()
lbls := c.relabel(entry)
if len(lbls) == 0 {
Expand All @@ -121,7 +121,7 @@ func (c *Component) Run(ctx context.Context) error {
select {
case <-ctx.Done():
return nil
case f <- entry:
case f.Chan() <- entry:
}
}
}
Expand Down
Loading