diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 8d30602cbb13..2873bf917b37 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -152,6 +152,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d *Heartbeat* +- google-pubsub input: ACK pub/sub message when acknowledged by publisher. {issue}13346[13346] {pull}14715[14715] +- Remove Beta label from google-pubsub input. {issue}13346[13346] {pull}14715[14715] *Journalbeat* diff --git a/x-pack/filebeat/docs/inputs/input-google-pubsub.asciidoc b/x-pack/filebeat/docs/inputs/input-google-pubsub.asciidoc index 2ce88ef276b9..30bc1fb25f72 100644 --- a/x-pack/filebeat/docs/inputs/input-google-pubsub.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-google-pubsub.asciidoc @@ -9,8 +9,6 @@ Google Pub/Sub ++++ -beta[] - Use the `google-pubsub` input to read messages from a Google Cloud Pub/Sub topic subscription. diff --git a/x-pack/filebeat/input/googlepubsub/input.go b/x-pack/filebeat/input/googlepubsub/input.go index 6ee432dce330..c2a5b53f0a6c 100644 --- a/x-pack/filebeat/input/googlepubsub/input.go +++ b/x-pack/filebeat/input/googlepubsub/input.go @@ -56,20 +56,10 @@ func NewInput( cfg *common.Config, connector channel.Connector, inputContext input.Context, -) (input.Input, error) { +) (inp input.Input, err error) { // Extract and validate the input's configuration. conf := defaultConfig() - if err := cfg.Unpack(&conf); err != nil { - return nil, err - } - - // Build outlet for events. - out, err := connector.ConnectWith(cfg, beat.ClientConfig{ - Processing: beat.ProcessingConfig{ - DynamicFields: inputContext.DynamicFields, - }, - }) - if err != nil { + if err = cfg.Unpack(&conf); err != nil { return nil, err } @@ -94,13 +84,31 @@ func NewInput( "pubsub_project", conf.ProjectID, "pubsub_topic", conf.Topic, "pubsub_subscription", conf.Subscription), - outlet: out, inputCtx: inputCtx, workerCtx: workerCtx, workerCancel: workerCancel, ackedCount: atomic.NewUint32(0), } + // Build outlet for events. + in.outlet, err = connector.ConnectWith(cfg, beat.ClientConfig{ + Processing: beat.ProcessingConfig{ + DynamicFields: inputContext.DynamicFields, + }, + ACKEvents: func(privates []interface{}) { + for _, priv := range privates { + if msg, ok := priv.(*pubsub.Message); ok { + msg.Ack() + in.ackedCount.Inc() + } else { + in.log.Error("Failed ACKing pub/sub event") + } + } + }, + }) + if err != nil { + return nil, err + } in.log.Info("Initialized Google Pub/Sub input.") return in, nil } @@ -152,15 +160,11 @@ func (in *pubsubInput) run() error { // Start receiving messages. topicID := makeTopicID(in.ProjectID, in.Topic) return sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) { - if ok := in.outlet.OnEvent(makeEvent(topicID, msg)); ok { - msg.Ack() - in.ackedCount.Inc() - return + if ok := in.outlet.OnEvent(makeEvent(topicID, msg)); !ok { + msg.Nack() + in.log.Debug("OnEvent returned false. Stopping input worker.") + cancel() } - - msg.Nack() - in.log.Debug("OnEvent returned false. Stopping input worker.") - cancel() }) } @@ -205,7 +209,8 @@ func makeEvent(topicID string, msg *pubsub.Message) beat.Event { Meta: common.MapStr{ "id": id, }, - Fields: fields, + Fields: fields, + Private: msg, } } diff --git a/x-pack/filebeat/input/googlepubsub/pubsub_test.go b/x-pack/filebeat/input/googlepubsub/pubsub_test.go index 2f09c1ff8ab9..8efc0034a76f 100644 --- a/x-pack/filebeat/input/googlepubsub/pubsub_test.go +++ b/x-pack/filebeat/input/googlepubsub/pubsub_test.go @@ -23,6 +23,7 @@ import ( "github.com/elastic/beats/filebeat/input" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/atomic" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/tests/compose" "github.com/elastic/beats/libbeat/tests/resources" @@ -211,6 +212,10 @@ func isInDockerIntegTestEnv() bool { } func runTest(t *testing.T, cfg *common.Config, run func(client *pubsub.Client, input *pubsubInput, out *stubOutleter, t *testing.T)) { + runTestWithACKer(t, cfg, ackEvent, run) +} + +func runTestWithACKer(t *testing.T, cfg *common.Config, acker acker, run func(client *pubsub.Client, input *pubsubInput, out *stubOutleter, t *testing.T)) { if !isInDockerIntegTestEnv() { // Don't test goroutines when using our compose.EnsureUp. defer resources.NewGoroutinesChecker().Check(t) @@ -226,10 +231,11 @@ func runTest(t *testing.T, cfg *common.Config, run func(client *pubsub.Client, i defer close(inputCtx.Done) // Stub outlet for receiving events generated by the input. - eventOutlet := newStubOutlet() + eventOutlet := newStubOutlet(acker) defer eventOutlet.Close() - connector := channel.ConnectorFunc(func(_ *common.Config, _ beat.ClientConfig) (channel.Outleter, error) { + connector := channel.ConnectorFunc(func(_ *common.Config, cliCfg beat.ClientConfig) (channel.Outleter, error) { + eventOutlet.setClientConfig(cliCfg) return eventOutlet, nil }) @@ -249,19 +255,46 @@ func newInputContext() input.Context { } } +type acker func(beat.Event, beat.ClientConfig) bool + type stubOutleter struct { sync.Mutex - cond *sync.Cond - done bool - Events []beat.Event + cond *sync.Cond + done bool + Events []beat.Event + clientCfg beat.ClientConfig + acker acker } -func newStubOutlet() *stubOutleter { - o := &stubOutleter{} +func newStubOutlet(acker acker) *stubOutleter { + o := &stubOutleter{ + acker: acker, + } o.cond = sync.NewCond(o) return o } +func ackEvent(ev beat.Event, cfg beat.ClientConfig) bool { + switch { + case cfg.ACKCount != nil: + cfg.ACKCount(1) + case cfg.ACKEvents != nil: + evs := [1]interface{}{ev.Private} + cfg.ACKEvents(evs[:]) + case cfg.ACKLastEvent != nil: + cfg.ACKLastEvent(ev.Private) + default: + return false + } + return true +} + +func (o *stubOutleter) setClientConfig(cfg beat.ClientConfig) { + o.Lock() + defer o.Unlock() + o.clientCfg = cfg +} + func (o *stubOutleter) waitForEvents(numEvents int) ([]beat.Event, bool) { o.Lock() defer o.Unlock() @@ -292,8 +325,11 @@ func (o *stubOutleter) Done() <-chan struct{} { return nil } func (o *stubOutleter) OnEvent(event beat.Event) bool { o.Lock() defer o.Unlock() - o.Events = append(o.Events, event) - o.cond.Broadcast() + acked := o.acker(event, o.clientCfg) + if acked { + o.Events = append(o.Events, event) + o.cond.Broadcast() + } return !o.done } @@ -382,3 +418,53 @@ func TestRunStop(t *testing.T) { input.Stop() }) } + +func TestEndToEndACK(t *testing.T) { + cfg := defaultTestConfig() + + var count atomic.Int + seen := make(map[string]struct{}) + // ACK every other message + halfAcker := func(ev beat.Event, clientConfig beat.ClientConfig) bool { + msg := ev.Private.(*pubsub.Message) + seen[msg.ID] = struct{}{} + if count.Inc()&1 != 0 { + // Nack will result in the Message being redelivered more quickly than if it were allowed to expire. + msg.Nack() + return false + } + return ackEvent(ev, clientConfig) + } + + runTestWithACKer(t, cfg, halfAcker, func(client *pubsub.Client, input *pubsubInput, out *stubOutleter, t *testing.T) { + createTopic(t, client) + createSubscription(t, client) + + group, _ := errgroup.WithContext(context.Background()) + group.Go(input.run) + + const numMsgs = 10 + publishMessages(t, client, numMsgs) + events, ok := out.waitForEvents(numMsgs) + if !ok { + t.Fatalf("Expected %d events, but got %d.", 1, len(events)) + } + + // Assert that all messages were eventually received + assert.Len(t, events, len(seen)) + got := make(map[string]struct{}) + for _, ev := range events { + msg := ev.Private.(*pubsub.Message) + got[msg.ID] = struct{}{} + } + for id := range seen { + _, exists := got[id] + assert.True(t, exists) + } + input.Stop() + out.Close() + if err := group.Wait(); err != nil { + t.Fatal(err) + } + }) +}