Skip to content

Commit

Permalink
Add more helpers to pipeline/testing package (#19405)
Browse files Browse the repository at this point in the history
(cherry picked from commit fac91b3)
  • Loading branch information
Steffen Siering committed Jul 7, 2020
1 parent 8f5355d commit 3d51280
Showing 1 changed file with 77 additions and 2 deletions.
79 changes: 77 additions & 2 deletions libbeat/publisher/testing/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,43 +17,118 @@

package testing

import "github.com/elastic/beats/v7/libbeat/beat"
import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/atomic"
)

// ClientCounter can be used to create a beat.PipelineConnector that count
// pipeline connects and disconnects.
type ClientCounter struct {
total atomic.Int
active atomic.Int
}

// FakeConnector implements the beat.PipelineConnector interface.
// The ConnectFunc is called for each connection attempt, and must be provided
// by tests that wish to use FakeConnector. If ConnectFunc is nil tests will panic
// if there is a connection attempt.
type FakeConnector struct {
ConnectFunc func(beat.ClientConfig) (beat.Client, error)
}

// FakeClient implements the beat.Client interface. The implementation of a
// custom PublishFunc and CloseFunc are optional.
type FakeClient struct {
// If set PublishFunc is called for each event that is published by a producer.
PublishFunc func(beat.Event)
CloseFunc func() error

// If set CloseFunc is called on Close. Otherwise Close returns nil.
CloseFunc func() error
}

var _ beat.PipelineConnector = FakeConnector{}
var _ beat.Client = (*FakeClient)(nil)

// ConnectWith calls the ConnectFunc with the given configuration.
func (c FakeConnector) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) {
return c.ConnectFunc(cfg)
}

// Connect calls the ConnectFunc with an empty configuration.
func (c FakeConnector) Connect() (beat.Client, error) {
return c.ConnectWith(beat.ClientConfig{})
}

// Publish calls PublishFunc, if PublishFunc is not nil.
func (c *FakeClient) Publish(event beat.Event) {
if c.PublishFunc != nil {
c.PublishFunc(event)
}
}

// Close calls CloseFunc, if CloseFunc is not nil. Otherwise it returns nil.
func (c *FakeClient) Close() error {
if c.CloseFunc == nil {
return nil
}
return c.CloseFunc()
}

// PublishAll calls PublishFunc for each event in the given slice.
func (c *FakeClient) PublishAll(events []beat.Event) {
for _, event := range events {
c.Publish(event)
}
}

// FailingConnector creates a pipeline that will always fail with the
// configured error value.
func FailingConnector(err error) beat.PipelineConnector {
return &FakeConnector{
ConnectFunc: func(_ beat.ClientConfig) (beat.Client, error) {
return nil, err
},
}
}

// ConstClient returns a pipeline that always returns the pre-configured beat.Client instance.
func ConstClient(client beat.Client) beat.PipelineConnector {
return &FakeConnector{
ConnectFunc: func(_ beat.ClientConfig) (beat.Client, error) {
return client, nil
},
}
}

// ChClient create a beat.Client that will forward all events to the given channel.
func ChClient(ch chan beat.Event) beat.Client {
return &FakeClient{
PublishFunc: func(event beat.Event) {
ch <- event
},
}
}

// Active returns the number of currently active connections.
func (c *ClientCounter) Active() int { return c.active.Load() }

// Total returns the total number of calls to Connect.
func (c *ClientCounter) Total() int { return c.total.Load() }

// BuildConnector create a pipeline that updates the active and tocal
// connection counters on Connect and Close calls.
func (c *ClientCounter) BuildConnector() beat.PipelineConnector {
return FakeConnector{
ConnectFunc: func(_ beat.ClientConfig) (beat.Client, error) {
c.total.Inc()
c.active.Inc()
return &FakeClient{
CloseFunc: func() error {
c.active.Dec()
return nil
},
}, nil
},
}
}

0 comments on commit 3d51280

Please sign in to comment.