Skip to content

Commit

Permalink
add equiv check to filebeat as shipper test
Browse files Browse the repository at this point in the history
  • Loading branch information
leehinman committed Sep 8, 2023
1 parent 8d3aa91 commit 5de9663
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 11 deletions.
4 changes: 3 additions & 1 deletion filebeat/channel/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,9 @@ func newCommonConfigEditor(
setOptional(meta, "pipeline", config.Pipeline)
setOptional(fields, "fileset.name", config.Fileset)
setOptional(fields, "service.type", serviceType)
setOptional(fields, "input.type", config.Type)
if !clientCfg.Processing.DisableType {
setOptional(fields, "input.type", config.Type)
}
if config.Module != "" {
event := mapstr.M{"module": config.Module}
if config.Fileset != "" {
Expand Down
3 changes: 3 additions & 0 deletions libbeat/beat/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ type ProcessingConfig struct {
// is applied to events. If nil the Beat's default behavior prevails.
EventNormalization *bool

// Disables the addition of input.type
DisableType bool

// Private contains additional information to be passed to the processing
// pipeline builder.
Private interface{}
Expand Down
13 changes: 7 additions & 6 deletions x-pack/filebeat/input/shipper/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ type InputManager struct {

// NewInputManager creates a new shipper input manager
func NewInputManager(log *logp.Logger) *InputManager {

log.Infof("creating new InputManager")
return &InputManager{
log: log.Named("shipper-beat"),
Expand All @@ -68,7 +67,6 @@ func (im *InputManager) Init(_ unison.Group, _ v2.Mode) error {
// Create creates the input from a given config
// in an attempt to speed things up, this will create the processors from the config before we have access to the pipeline to create the clients
func (im *InputManager) Create(cfg *config.C) (v2.Input, error) {

config := Instance{}
if err := cfg.Unpack(&config); err != nil {
return nil, fmt.Errorf("error unpacking config: %w", err)
Expand Down Expand Up @@ -135,7 +133,7 @@ func (in *shipperInput) Test(ctx v2.TestContext) error {
// Stop the shipper
func (in *shipperInput) Stop() {
in.log.Infof("shipper shutting down")
//stop individual clients
// stop individual clients
for streamID, stream := range in.streams {
err := stream.client.Close()
if err != nil {
Expand Down Expand Up @@ -171,8 +169,11 @@ func (in *shipperInput) Run(inputContext v2.Context, pipeline beat.Pipeline) err
PublishMode: beat.GuaranteedSend,
EventListener: acker.TrackingCounter(in.acker.Track),
Processing: beat.ProcessingConfig{
Processor: streamProc.processors,
Processor: streamProc.processors,
DisableHost: true,
DisableType: true,
},

CloseRef: inputContext.Cancelation,
})
if err != nil {
Expand All @@ -184,7 +185,7 @@ func (in *shipperInput) Run(inputContext v2.Context, pipeline beat.Pipeline) err
in.streams[streamID] = newStreamData
}

//setup gRPC
// setup gRPC
err := in.setupgRPC(pipeline)
if err != nil {
return fmt.Errorf("error starting shipper gRPC server: %w", err)
Expand Down Expand Up @@ -264,7 +265,7 @@ func (in *shipperInput) setupgRPC(pipeline beat.Pipeline) error {
}

func (in *shipperInput) sendEvent(event *messages.Event) (uint64, error) {
//look for matching processor config
// look for matching processor config
stream, ok := in.streams[event.Source.StreamId]
if !ok {
return 0, fmt.Errorf("could not find data stream associated with ID '%s'", event.Source.StreamId)
Expand Down
90 changes: 86 additions & 4 deletions x-pack/filebeat/tests/integration/shipper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,18 @@ package integration
import (
"context"
"crypto/rand"
"encoding/json"
"fmt"
"os"
"path/filepath"
"testing"
"time"

"github.com/sergi/go-diff/diffmatchpatch"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/tests/integration"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/typedapi/core/search"
"github.com/elastic/go-elasticsearch/v8/typedapi/types"
Expand Down Expand Up @@ -75,13 +78,29 @@ setup.kibana:
username: %s
password: %s
logging.level: debug
queue.mem:
events: 100
flush.min_events: 0
processors:
- add_fields:
target: data_stream
fields:
type: logs
namespace: generic
dataset: generic
- add_fields:
target: host
fields:
name: %s
- add_fields:
target: agent
fields:
type: metricbeat
`
// check that file can be ingested normally and found in elasticsearch
filebeat := integration.NewBeat(t, "filebeat", "../../filebeat.test")
filebeat.WriteConfigFile(fmt.Sprintf(cfg, inputFilePath, esURL.Host, esURL.User.Username(), esPassword, kURL.Host, kUserInfo.Username(), kPassword))
filebeat.WriteConfigFile(fmt.Sprintf(cfg, inputFilePath, esURL.Host, esURL.User.Username(), esPassword, kURL.Host, kUserInfo.Username(), kPassword, uniqMsg))
filebeat.Start()
filebeat.WaitForLogs("Publish event: ", 10*time.Second)
filebeat.WaitForLogs("PublishEvents: ", 10*time.Second)
Expand Down Expand Up @@ -162,10 +181,18 @@ processors:
type: logs
namespace: generic
dataset: generic
- add_fields:
target: host
fields:
name: %s
- add_fields:
target: agent
fields:
type: metricbeat
`
// start filebeat with shipper output, make doc is ingested into elasticsearch
fb2shipper := integration.NewBeat(t, "filebeat", "../../filebeat.test")
fb2shipper.WriteConfigFile(fmt.Sprintf(fb2shipperCfg, inputFilePath, gRpcPath, kURL.Host, kUserInfo.Username(), kPassword))
fb2shipper.WriteConfigFile(fmt.Sprintf(fb2shipperCfg, inputFilePath, gRpcPath, kURL.Host, kUserInfo.Username(), kPassword, uniqMsg))
fb2shipper.Start()
fb2shipper.WaitForLogs("Publish event: ", 10*time.Second)
fb2shipper.WaitForLogs("events to protobuf", 10*time.Second)
Expand All @@ -185,6 +212,61 @@ processors:
require.NoError(t, err, "error doing search request: %s", err)
return res.Hits.Total.Value == 2
}, 30*time.Second, 250*time.Millisecond, "never found 2 documents")
// ToDo add comparison of docs to make sure they are the same
// for example right now input.type is being overwritten with shipper

res, err := es.Search().
Index(".ds-filebeat-*").
Request(&search.Request{
Query: &types.Query{
Match: map[string]types.MatchQuery{
"message": {
Query: uniqMsg,
Operator: &operator.And,
},
},
},
}).Do(context.Background())
require.Equal(t, int64(2), res.Hits.Total.Value)
diff, err := diffDocs(res.Hits.Hits[0].Source_,
res.Hits.Hits[1].Source_)
require.NoError(t, err, "error diffing docs")
if len(diff) != 0 {
t.Fatalf("docs differ:\n:%s\n", diff)
}
}

func diffDocs(doc1 json.RawMessage, doc2 json.RawMessage) (string, error) {
fieldsToDrop := []string{
"@timestamp",
"agent.ephemeral_id",
"agent.id",
"elastic_agent.id",
}
var d1 map[string]interface{}
var d2 map[string]interface{}

if err := json.Unmarshal(doc1, &d1); err != nil {
return "", err
}

if err := json.Unmarshal(doc2, &d2); err != nil {
return "", err
}
f1 := mapstr.M(d1).Flatten()
f2 := mapstr.M(d2).Flatten()

for _, key := range fieldsToDrop {
_ = f1.Delete(key)
}

for _, key := range fieldsToDrop {
_ = f2.Delete(key)
}

dmp := diffmatchpatch.New()
diffs := dmp.DiffMain(f1.StringToPrint(), f2.StringToPrint(), false)

if len(diffs) != 1 {
return dmp.DiffPrettyText(diffs), nil
}
return "", nil
}

0 comments on commit 5de9663

Please sign in to comment.