Skip to content

Commit

Permalink
Make loki.LogsReceiver an interface (#4303)
Browse files Browse the repository at this point in the history
  • Loading branch information
thampiotr authored and clayton-cornell committed Aug 14, 2023
1 parent 2fbfd4f commit 7afad78
Show file tree
Hide file tree
Showing 39 changed files with 164 additions and 145 deletions.
6 changes: 3 additions & 3 deletions component/common/loki/client/fake/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

// Client is a fake client used for testing.
type Client struct {
entries loki.LogsReceiver
entries chan loki.Entry
received []loki.Entry
once sync.Once
mtx sync.Mutex
Expand All @@ -22,7 +22,7 @@ type Client struct {
func NewClient(stop func()) *Client {
c := &Client{
OnStop: stop,
entries: make(loki.LogsReceiver),
entries: make(chan loki.Entry),
}
c.wg.Add(1)
go func() {
Expand All @@ -49,7 +49,7 @@ func (c *Client) Chan() chan<- loki.Entry {

// LogsReceiver returns this client as a LogsReceiver, which is useful in testing.
func (c *Client) LogsReceiver() loki.LogsReceiver {
return c.entries
return loki.NewLogsReceiverWithChannel(c.entries)
}

func (c *Client) Received() []loki.Entry {
Expand Down
28 changes: 24 additions & 4 deletions component/common/loki/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,29 @@ import (
// to an outage or erroring (such as limits being hit).
const finalEntryTimeout = 5 * time.Second

// LogsReceiver is an alias for chan Entry which is used for component
// LogsReceiver is an interface providing `chan Entry` which is used for component
// communication.
type LogsReceiver chan Entry
type LogsReceiver interface {
Chan() chan Entry
}

type logsReceiver struct {
entries chan Entry
}

func (l *logsReceiver) Chan() chan Entry {
return l.entries
}

func NewLogsReceiver() LogsReceiver {
return NewLogsReceiverWithChannel(make(chan Entry))
}

func NewLogsReceiverWithChannel(c chan Entry) LogsReceiver {
return &logsReceiver{
entries: c,
}
}

// Entry is a log entry with labels.
type Entry struct {
Expand Down Expand Up @@ -55,7 +75,7 @@ type EntryHandler interface {
}

// EntryMiddleware takes an EntryHandler and returns another one that will intercept and forward entries.
// The newly created EntryHandler should be Stopped independently from the original one.
// The newly created EntryHandler should be Stopped independently of the original one.
type EntryMiddleware interface {
Wrap(EntryHandler) EntryHandler
}
Expand Down Expand Up @@ -133,7 +153,7 @@ func NewEntryMutatorHandler(next EntryHandler, f EntryMutatorFunc) EntryHandler

select {
case <-ctx.Done():
// The goroutine above exited on its own so we don't have to wait for
// The goroutine above exited on its own, so we don't have to wait for
// the timeout.
case <-time.After(finalEntryTimeout):
// We reached the timeout for sending the final entry to nextChan;
Expand Down
5 changes: 2 additions & 3 deletions component/loki/echo/echo.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,9 @@ type Component struct {

// New creates a new loki.echo component.
func New(o component.Options, args Arguments) (*Component, error) {
ch := make(chan loki.Entry)
c := &Component{
opts: o,
receiver: ch,
receiver: loki.NewLogsReceiver(),
}

// Call to Update() once at the start.
Expand All @@ -77,7 +76,7 @@ func (c *Component) Run(ctx context.Context) error {
select {
case <-ctx.Done():
return nil
case entry := <-c.receiver:
case entry := <-c.receiver.Chan():
level.Info(c.opts.Logger).Log("receiver", c.opts.ID, "entry", entry.Line, "labels", entry.Labels.String())
}
}
Expand Down
8 changes: 4 additions & 4 deletions component/loki/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ func New(o component.Options, args Arguments) (*Component, error) {

// Create and immediately export the receiver which remains the same for
// the component's lifetime.
c.receiver = make(loki.LogsReceiver)
c.processOut = make(loki.LogsReceiver)
c.receiver = loki.NewLogsReceiver()
c.processOut = make(chan loki.Entry)
o.OnStateChange(Exports{Receiver: c.receiver})

// Call to Update() to start readers and set receivers once at the start.
Expand Down Expand Up @@ -126,7 +126,7 @@ func (c *Component) handleIn(ctx context.Context, wg *sync.WaitGroup) {
select {
case <-ctx.Done():
return
case entry := <-c.receiver:
case entry := <-c.receiver.Chan():
c.mut.RLock()
select {
case <-ctx.Done():
Expand Down Expand Up @@ -155,7 +155,7 @@ func (c *Component) handleOut(ctx context.Context, wg *sync.WaitGroup) {
select {
case <-ctx.Done():
return
case f <- entry:
case f.Chan() <- entry:
// no-op
}
}
Expand Down
30 changes: 15 additions & 15 deletions component/loki/process/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestJSONLabelsStage(t *testing.T) {
err := river.Unmarshal([]byte(stg), &stagesCfg)
require.NoError(t, err)

ch1, ch2 := make(loki.LogsReceiver), make(loki.LogsReceiver)
ch1, ch2 := loki.NewLogsReceiver(), loki.NewLogsReceiver()

// Create and run the component, so that it can process and forwards logs.
opts := component.Options{
Expand Down Expand Up @@ -97,7 +97,7 @@ func TestJSONLabelsStage(t *testing.T) {
},
}

c.receiver <- logEntry
c.receiver.Chan() <- logEntry

wantLabelSet := model.LabelSet{
"filename": "/var/log/pods/agent/agent/1.log",
Expand All @@ -111,11 +111,11 @@ func TestJSONLabelsStage(t *testing.T) {
// stages correctly applied.
for i := 0; i < 2; i++ {
select {
case logEntry := <-ch1:
case logEntry := <-ch1.Chan():
require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)
require.Equal(t, logline, logEntry.Line)
require.Equal(t, wantLabelSet, logEntry.Labels)
case logEntry := <-ch2:
case logEntry := <-ch2.Chan():
require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)
require.Equal(t, logline, logEntry.Line)
require.Equal(t, wantLabelSet, logEntry.Labels)
Expand Down Expand Up @@ -154,7 +154,7 @@ stage.label_keep {
err := river.Unmarshal([]byte(stg), &stagesCfg)
require.NoError(t, err)

ch1, ch2 := make(loki.LogsReceiver), make(loki.LogsReceiver)
ch1, ch2 := loki.NewLogsReceiver(), loki.NewLogsReceiver()

// Create and run the component, so that it can process and forwards logs.
opts := component.Options{
Expand Down Expand Up @@ -184,7 +184,7 @@ stage.label_keep {
},
}

c.receiver <- logEntry
c.receiver.Chan() <- logEntry

wantLabelSet := model.LabelSet{
"filename": "/var/log/pods/agent/agent/1.log",
Expand All @@ -195,11 +195,11 @@ stage.label_keep {
// stages correctly applied.
for i := 0; i < 2; i++ {
select {
case logEntry := <-ch1:
case logEntry := <-ch1.Chan():
require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)
require.Equal(t, logline, logEntry.Line)
require.Equal(t, wantLabelSet, logEntry.Labels)
case logEntry := <-ch2:
case logEntry := <-ch2.Chan():
require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)
require.Equal(t, logline, logEntry.Line)
require.Equal(t, wantLabelSet, logEntry.Labels)
Expand Down Expand Up @@ -249,7 +249,7 @@ stage.labels {
err := river.Unmarshal([]byte(stg), &stagesCfg)
require.NoError(t, err)

ch1, ch2 := make(loki.LogsReceiver), make(loki.LogsReceiver)
ch1, ch2 := loki.NewLogsReceiver(), loki.NewLogsReceiver()

// Create and run the component, so that it can process and forwards logs.
opts := component.Options{
Expand Down Expand Up @@ -279,7 +279,7 @@ stage.labels {
},
}

c.receiver <- logEntry
c.receiver.Chan() <- logEntry

wantLabelSet := model.LabelSet{
"filename": "/var/log/pods/agent/agent/1.log",
Expand All @@ -294,11 +294,11 @@ stage.labels {
// stages correctly applied.
for i := 0; i < 2; i++ {
select {
case logEntry := <-ch1:
case logEntry := <-ch1.Chan():
require.Equal(t, wantLogline, logEntry.Line)
require.Equal(t, wantTimestamp, logEntry.Timestamp)
require.Equal(t, wantLabelSet, logEntry.Labels)
case logEntry := <-ch2:
case logEntry := <-ch2.Chan():
require.Equal(t, wantLogline, logEntry.Line)
require.Equal(t, wantTimestamp, logEntry.Timestamp)
require.Equal(t, wantLabelSet, logEntry.Labels)
Expand All @@ -323,7 +323,7 @@ stage.static_labels {
}
`

ch1, ch2 := make(loki.LogsReceiver), make(loki.LogsReceiver)
ch1, ch2 := loki.NewLogsReceiver(), loki.NewLogsReceiver()
var args1, args2 Arguments
require.NoError(t, river.Unmarshal([]byte(stg1), &args1))
require.NoError(t, river.Unmarshal([]byte(stg2), &args2))
Expand Down Expand Up @@ -374,11 +374,11 @@ stage.static_labels {
// race condition between them.
for i := 0; i < 2; i++ {
select {
case logEntry := <-ch1:
case logEntry := <-ch1.Chan():
require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)
require.Equal(t, "writing some text", logEntry.Line)
require.Equal(t, wantLabelSet.Clone().Merge(model.LabelSet{"lbl": "foo"}), logEntry.Labels)
case logEntry := <-ch2:
case logEntry := <-ch2.Chan():
require.WithinDuration(t, time.Now(), logEntry.Timestamp, 1*time.Second)
require.Equal(t, "writing some text", logEntry.Line)
require.Equal(t, wantLabelSet.Clone().Merge(model.LabelSet{"lbl": "bar"}), logEntry.Labels)
Expand Down
6 changes: 3 additions & 3 deletions component/loki/relabel/relabel.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func New(o component.Options, args Arguments) (*Component, error) {

// Create and immediately export the receiver which remains the same for
// the component's lifetime.
c.receiver = make(loki.LogsReceiver)
c.receiver = loki.NewLogsReceiver()
o.OnStateChange(Exports{Receiver: c.receiver, Rules: args.RelabelConfigs})

// Call to Update() to set the relabelling rules once at the start.
Expand All @@ -107,7 +107,7 @@ func (c *Component) Run(ctx context.Context) error {
select {
case <-ctx.Done():
return nil
case entry := <-c.receiver:
case entry := <-c.receiver.Chan():
c.metrics.entriesProcessed.Inc()
lbls := c.relabel(entry)
if len(lbls) == 0 {
Expand All @@ -121,7 +121,7 @@ func (c *Component) Run(ctx context.Context) error {
select {
case <-ctx.Done():
return nil
case f <- entry:
case f.Chan() <- entry:
}
}
}
Expand Down
Loading

0 comments on commit 7afad78

Please sign in to comment.