Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick #19405 to 7.x: Add more helpers to pipeline/testing package #19711

Merged
merged 1 commit into from
Jul 8, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 77 additions & 2 deletions libbeat/publisher/testing/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,43 +17,118 @@

package testing

import "github.com/elastic/beats/v7/libbeat/beat"
import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/atomic"
)

// ClientCounter can be used to create a beat.PipelineConnector that count
// pipeline connects and disconnects.
type ClientCounter struct {
total atomic.Int
active atomic.Int
}

// FakeConnector implements the beat.PipelineConnector interface.
// The ConnectFunc is called for each connection attempt, and must be provided
// by tests that wish to use FakeConnector. If ConnectFunc is nil tests will panic
// if there is a connection attempt.
type FakeConnector struct {
ConnectFunc func(beat.ClientConfig) (beat.Client, error)
}

// FakeClient implements the beat.Client interface. The implementation of a
// custom PublishFunc and CloseFunc are optional.
type FakeClient struct {
// If set PublishFunc is called for each event that is published by a producer.
PublishFunc func(beat.Event)
CloseFunc func() error

// If set CloseFunc is called on Close. Otherwise Close returns nil.
CloseFunc func() error
}

var _ beat.PipelineConnector = FakeConnector{}
var _ beat.Client = (*FakeClient)(nil)

// ConnectWith calls the ConnectFunc with the given configuration.
func (c FakeConnector) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) {
return c.ConnectFunc(cfg)
}

// Connect calls the ConnectFunc with an empty configuration.
func (c FakeConnector) Connect() (beat.Client, error) {
return c.ConnectWith(beat.ClientConfig{})
}

// Publish calls PublishFunc, if PublishFunc is not nil.
func (c *FakeClient) Publish(event beat.Event) {
if c.PublishFunc != nil {
c.PublishFunc(event)
}
}

// Close calls CloseFunc, if CloseFunc is not nil. Otherwise it returns nil.
func (c *FakeClient) Close() error {
if c.CloseFunc == nil {
return nil
}
return c.CloseFunc()
}

// PublishAll calls PublishFunc for each event in the given slice.
func (c *FakeClient) PublishAll(events []beat.Event) {
for _, event := range events {
c.Publish(event)
}
}

// FailingConnector creates a pipeline that will always fail with the
// configured error value.
func FailingConnector(err error) beat.PipelineConnector {
return &FakeConnector{
ConnectFunc: func(_ beat.ClientConfig) (beat.Client, error) {
return nil, err
},
}
}

// ConstClient returns a pipeline that always returns the pre-configured beat.Client instance.
func ConstClient(client beat.Client) beat.PipelineConnector {
return &FakeConnector{
ConnectFunc: func(_ beat.ClientConfig) (beat.Client, error) {
return client, nil
},
}
}

// ChClient create a beat.Client that will forward all events to the given channel.
func ChClient(ch chan beat.Event) beat.Client {
return &FakeClient{
PublishFunc: func(event beat.Event) {
ch <- event
},
}
}

// Active returns the number of currently active connections.
func (c *ClientCounter) Active() int { return c.active.Load() }

// Total returns the total number of calls to Connect.
func (c *ClientCounter) Total() int { return c.total.Load() }

// BuildConnector create a pipeline that updates the active and tocal
// connection counters on Connect and Close calls.
func (c *ClientCounter) BuildConnector() beat.PipelineConnector {
return FakeConnector{
ConnectFunc: func(_ beat.ClientConfig) (beat.Client, error) {
c.total.Inc()
c.active.Inc()
return &FakeClient{
CloseFunc: func() error {
c.active.Dec()
return nil
},
}, nil
},
}
}