Skip to content

Commit

Permalink
Remove unused ack tracking (elastic#38983)
Browse files Browse the repository at this point in the history
Remove `EventACKTracker` from `awscloudwatch` event handling.

`EventACKTracker` is buggy (see elastic#38961) but in the `awscloudwatch` input it's also never used -- its callbacks are attached to events, but their result doesn't affect any of the event handling.

This PR doesn't change any functional behavior.
  • Loading branch information
faec authored Apr 23, 2024
1 parent deece39 commit 38f49a9
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 21 deletions.
4 changes: 1 addition & 3 deletions x-pack/filebeat/input/awscloudwatch/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,7 @@ func (in *cloudwatchInput) Run(inputContext v2.Context, pipeline beat.Pipeline)
defer cancelInputCtx()

// Create client for publishing events and receive notification of their ACKs.
client, err := pipeline.ConnectWith(beat.ClientConfig{
EventListener: awscommon.NewEventACKHandler(),
})
client, err := pipeline.ConnectWith(beat.ClientConfig{})
if err != nil {
return fmt.Errorf("failed to create pipeline client: %w", err)
}
Expand Down
7 changes: 0 additions & 7 deletions x-pack/filebeat/input/awscloudwatch/input_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
pubtest "github.com/elastic/beats/v7/libbeat/publisher/testing"
awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
)
Expand Down Expand Up @@ -163,12 +162,6 @@ func TestInputWithLogGroupNamePrefix(t *testing.T) {

client := pubtest.NewChanClient(0)
defer close(client.Channel)
go func() {
for event := range client.Channel {
// Fake the ACK handling that's not implemented in pubtest.
event.Private.(*awscommon.EventACKTracker).ACK()
}
}()

var errGroup errgroup.Group
errGroup.Go(func() error {
Expand Down
13 changes: 2 additions & 11 deletions x-pack/filebeat/input/awscloudwatch/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types"

"github.com/elastic/beats/v7/libbeat/beat"
awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
)
Expand All @@ -20,7 +19,6 @@ type logProcessor struct {
log *logp.Logger
metrics *inputMetrics
publisher beat.Client
ack *awscommon.EventACKTracker
}

func newLogProcessor(log *logp.Logger, metrics *inputMetrics, publisher beat.Client, ctx context.Context) *logProcessor {
Expand All @@ -31,24 +29,17 @@ func newLogProcessor(log *logp.Logger, metrics *inputMetrics, publisher beat.Cli
log: log,
metrics: metrics,
publisher: publisher,
ack: awscommon.NewEventACKTracker(ctx),
}
}

func (p *logProcessor) processLogEvents(logEvents []types.FilteredLogEvent, logGroup string, regionName string) {
for _, logEvent := range logEvents {
event := createEvent(logEvent, logGroup, regionName)
p.publish(p.ack, &event)
p.metrics.cloudwatchEventsCreatedTotal.Inc()
p.publisher.Publish(event)
}
}

func (p *logProcessor) publish(ack *awscommon.EventACKTracker, event *beat.Event) {
ack.Add()
event.Private = ack
p.metrics.cloudwatchEventsCreatedTotal.Inc()
p.publisher.Publish(*event)
}

func createEvent(logEvent types.FilteredLogEvent, logGroup string, regionName string) beat.Event {
event := beat.Event{
Timestamp: time.Unix(*logEvent.Timestamp/1000, 0).UTC(),
Expand Down

0 comments on commit 38f49a9

Please sign in to comment.