diff --git a/NOTICE.txt b/NOTICE.txt index e23ebb81db43..cdd08c68d430 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -10069,11 +10069,11 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-l -------------------------------------------------------------------------------- Dependency : github.com/elastic/elastic-agent-shipper-client -Version: v0.2.0 +Version: v0.4.0 Licence type (autodetected): Elastic -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-shipper-client@v0.2.0/LICENSE.txt: +Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-shipper-client@v0.4.0/LICENSE.txt: Elastic License 2.0 diff --git a/go.mod b/go.mod index 97a4c2a4a840..06059cd0f423 100644 --- a/go.mod +++ b/go.mod @@ -203,7 +203,7 @@ require ( github.com/elastic/bayeux v1.0.5 github.com/elastic/elastic-agent-autodiscover v0.2.1 github.com/elastic/elastic-agent-libs v0.2.11 - github.com/elastic/elastic-agent-shipper-client v0.2.0 + github.com/elastic/elastic-agent-shipper-client v0.4.0 github.com/elastic/elastic-agent-system-metrics v0.4.4 github.com/elastic/go-elasticsearch/v8 v8.2.0 github.com/pierrec/lz4/v4 v4.1.15 diff --git a/go.sum b/go.sum index 7d4451b58186..ef59438d8661 100644 --- a/go.sum +++ b/go.sum @@ -603,8 +603,8 @@ github.com/elastic/elastic-agent-libs v0.2.5/go.mod h1:chO3rtcLyGlKi9S0iGVZhYCzD github.com/elastic/elastic-agent-libs v0.2.7/go.mod h1:chO3rtcLyGlKi9S0iGVZhYCzDfdDsAQYBc+ui588AFE= github.com/elastic/elastic-agent-libs v0.2.11 h1:ZeYn35Kxt+IdtMPmE01TaDeaahCg/z7MkGPVWUo6Lp4= github.com/elastic/elastic-agent-libs v0.2.11/go.mod h1:chO3rtcLyGlKi9S0iGVZhYCzDfdDsAQYBc+ui588AFE= -github.com/elastic/elastic-agent-shipper-client v0.2.0 h1:p+5ep48YCOe+3nICeWmiLwQV11yDLad2n4NunI66Shg= -github.com/elastic/elastic-agent-shipper-client v0.2.0/go.mod h1:OyI2W+Mv3JxlkEF3OeT7K0dbuxvwew8ke2Cf4HpLa9Q= +github.com/elastic/elastic-agent-shipper-client v0.4.0 h1:nsTJF9oo4RHLl+zxFUZqNHaE86C6Ba5aImfegcEf6Sk= +github.com/elastic/elastic-agent-shipper-client v0.4.0/go.mod h1:OyI2W+Mv3JxlkEF3OeT7K0dbuxvwew8ke2Cf4HpLa9Q= github.com/elastic/elastic-agent-system-metrics v0.4.4 h1:Br3S+TlBhijrLysOvbHscFhgQ00X/trDT5VEnOau0E0= github.com/elastic/elastic-agent-system-metrics v0.4.4/go.mod h1:tF/f9Off38nfzTZHIVQ++FkXrDm9keFhFpJ+3pQ00iI= github.com/elastic/elastic-transport-go/v8 v8.1.0 h1:NeqEz1ty4RQz+TVbUrpSU7pZ48XkzGWQj02k5koahIE= diff --git a/libbeat/outputs/shipper/api/shipper_mock.go b/libbeat/outputs/shipper/api/shipper_mock.go index fce9db750cd9..7b9c3534e5de 100644 --- a/libbeat/outputs/shipper/api/shipper_mock.go +++ b/libbeat/outputs/shipper/api/shipper_mock.go @@ -22,17 +22,22 @@ import ( pb "github.com/elastic/elastic-agent-shipper-client/pkg/proto" "github.com/elastic/elastic-agent-shipper-client/pkg/proto/messages" + + "github.com/gofrs/uuid" ) func NewProducerMock(cap int) *ProducerMock { + id, _ := uuid.NewV4() return &ProducerMock{ - Q: make([]*messages.Event, 0, cap), + UUID: id.String(), + Q: make([]*messages.Event, 0, cap), } } type ProducerMock struct { pb.UnimplementedProducerServer Q []*messages.Event + UUID string Error error } @@ -52,5 +57,14 @@ func (p *ProducerMock) PublishEvents(ctx context.Context, r *messages.PublishReq resp.AcceptedCount++ } + resp.AcceptedIndex = uint64(len(p.Q)) + return resp, nil } + +func (p *ProducerMock) PersistedIndex(req *messages.PersistedIndexRequest, producer pb.Producer_PersistedIndexServer) error { + return producer.Send(&messages.PersistedIndexReply{ + Uuid: p.UUID, + PersistedIndex: uint64(len(p.Q)), + }) +} diff --git a/libbeat/outputs/shipper/config.go b/libbeat/outputs/shipper/config.go index 7e0a5d10386a..43bef1a63c0e 100644 --- a/libbeat/outputs/shipper/config.go +++ b/libbeat/outputs/shipper/config.go @@ -34,21 +34,27 @@ type Config struct { // TLS/SSL configurationf or secure connection TLS *tlscommon.Config `config:"ssl"` // Timeout of a single batch publishing request - Timeout time.Duration `config:"timeout" validate:"min=1"` + Timeout time.Duration `config:"timeout" validate:"min=1"` // MaxRetries is how many times the same batch is attempted to be sent - MaxRetries int `config:"max_retries" validate:"min=-1,nonzero"` + MaxRetries int `config:"max_retries" validate:"min=-1,nonzero"` // BulkMaxSize max amount of events in a single batch BulkMaxSize int `config:"bulk_max_size"` + // AckPollingInterval is an interval for polling the shipper server for persistence acknowledgement. + // The default/minimum 5 ms polling interval means we could publish at most 1000 ms / 5 ms = 200 batches/sec. + // With the default `bulk_max_size` of 50 events this would limit + // the default throughput per worker to 200 * 50 = 10000 events/sec. + AckPollingInterval time.Duration `config:"ack_polling_interval" validate:"min=5ms"` // Backoff strategy for the shipper output Backoff backoffConfig `config:"backoff"` } func defaultConfig() Config { return Config{ - TLS: nil, - Timeout: 30 * time.Second, - MaxRetries: 3, - BulkMaxSize: 50, + TLS: nil, + Timeout: 30 * time.Second, + MaxRetries: 3, + BulkMaxSize: 50, + AckPollingInterval: 5 * time.Millisecond, Backoff: backoffConfig{ Init: 1 * time.Second, Max: 60 * time.Second, diff --git a/libbeat/outputs/shipper/shipper.go b/libbeat/outputs/shipper/shipper.go index eceedff9cd79..9a9aab658574 100644 --- a/libbeat/outputs/shipper/shipper.go +++ b/libbeat/outputs/shipper/shipper.go @@ -39,6 +39,7 @@ import ( "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -49,6 +50,7 @@ type shipper struct { client sc.ProducerClient timeout time.Duration config Config + serverID string } func init() { @@ -109,11 +111,24 @@ func (c *shipper) Connect() error { if err != nil { return fmt.Errorf("shipper connection failed with: %w", err) } - c.log.Debugf("connect to %s established.", c.config.Server) c.conn = conn c.client = sc.NewProducerClient(conn) + indexClient, err := c.client.PersistedIndex(ctx, &messages.PersistedIndexRequest{ + PollingInterval: durationpb.New(0), // no need to subscribe, we need it only once + }) + if err != nil { + return fmt.Errorf("failed to connect to the server: %w", err) + } + serverInfo, err := indexClient.Recv() + if err != nil { + return fmt.Errorf("failed to fetch server information: %w", err) + } + c.serverID = serverInfo.GetUuid() + + c.log.Debugf("connection to %s (%s) established.", c.config.Server, c.serverID) + return nil } @@ -153,29 +168,55 @@ func (c *shipper) Publish(ctx context.Context, batch publisher.Batch) error { ctx, cancel := context.WithTimeout(ctx, c.timeout) defer cancel() - resp, err := c.client.PublishEvents(ctx, &messages.PublishRequest{ + publishReply, err := c.client.PublishEvents(ctx, &messages.PublishRequest{ + Uuid: c.serverID, Events: convertedEvents, }) - if status.Code(err) != codes.OK || resp == nil { + if status.Code(err) != codes.OK { batch.Cancelled() // does not decrease the TTL st.Cancelled(len(events)) // we cancel the whole batch not just non-dropped events return fmt.Errorf("failed to publish the batch to the shipper, none of the %d events were accepted: %w", len(convertedEvents), err) } // with a correct server implementation should never happen, this error is not recoverable - if int(resp.AcceptedCount) > len(nonDroppedEvents) { + if int(publishReply.AcceptedCount) > len(nonDroppedEvents) { return fmt.Errorf( "server returned unexpected results, expected maximum accepted items %d, got %d", len(nonDroppedEvents), - resp.AcceptedCount, + publishReply.AcceptedCount, ) } - // the server is supposed to retain the order of the initial events in the response - // judging by the size of the result list we can determine what part of the initial - // list was accepted and we can send the rest of the list for a retry - retries := nonDroppedEvents[resp.AcceptedCount:] + // so we explicitly close the stream + ackCtx, cancel := context.WithCancel(ctx) + defer cancel() + ackClient, err := c.client.PersistedIndex(ackCtx, &messages.PersistedIndexRequest{ + PollingInterval: durationpb.New(c.config.AckPollingInterval), + }) + if err != nil { + return fmt.Errorf("acknowledgement failed due to the connectivity error: %w", err) + } + + indexReply, err := ackClient.Recv() + if err != nil { + return fmt.Errorf("acknowledgement failed due to the connectivity error: %w", err) + } + + if indexReply.GetUuid() != c.serverID { + batch.Cancelled() + st.Cancelled(len(events)) + return fmt.Errorf("acknowledgement failed due to a connection to a different server %s, expected %s", indexReply.Uuid, c.serverID) + } + + for indexReply.PersistedIndex < publishReply.AcceptedIndex { + indexReply, err = ackClient.Recv() + if err != nil { + return fmt.Errorf("acknowledgement failed due to the connectivity error: %w", err) + } + } + + retries := nonDroppedEvents[publishReply.AcceptedCount:] if len(retries) == 0 { batch.ACK() st.Acked(len(nonDroppedEvents)) @@ -183,7 +224,7 @@ func (c *shipper) Publish(ctx context.Context, batch publisher.Batch) error { } else { batch.RetryEvents(retries) // decreases TTL unless guaranteed delivery st.Failed(len(retries)) - c.log.Debugf("%d events have been accepted, %d sent for retry, %d dropped", resp.AcceptedCount, len(retries), droppedCount) + c.log.Debugf("%d events have been accepted, %d sent for retry, %d dropped", publishReply.AcceptedCount, len(retries), droppedCount) } return nil diff --git a/libbeat/outputs/shipper/shipper_test.go b/libbeat/outputs/shipper/shipper_test.go index 084510064339..9cdd869e9713 100644 --- a/libbeat/outputs/shipper/shipper_test.go +++ b/libbeat/outputs/shipper/shipper_test.go @@ -305,21 +305,11 @@ func TestPublish(t *testing.T) { }) require.NoError(t, err) - group, err := makeShipper( - nil, - beat.Info{Beat: "libbeat", IndexPrefix: "testbeat"}, - outputs.NewNilObserver(), - cfg, - ) - require.NoError(t, err) - require.Len(t, group.Clients, 1) + client := createShipperClient(t, cfg) batch := outest.NewBatch(tc.events...) - err = group.Clients[0].(outputs.Connectable).Connect() - require.NoError(t, err) - - err = group.Clients[0].Publish(ctx, batch) + err = client.Publish(ctx, batch) if tc.expError != "" { require.Error(t, err) require.Contains(t, err.Error(), tc.expError) @@ -348,19 +338,7 @@ func TestPublish(t *testing.T) { }) require.NoError(t, err) - group, err := makeShipper( - nil, - beat.Info{Beat: "libbeat", IndexPrefix: "testbeat"}, - outputs.NewNilObserver(), - cfg, - ) - require.NoError(t, err) - require.Len(t, group.Clients, 1) - - client := group.Clients[0].(outputs.NetworkClient) - - err = client.Connect() - require.NoError(t, err) + client := createShipperClient(t, cfg) // Should successfully publish with the server running batch := outest.NewBatch(events...) @@ -407,6 +385,48 @@ func TestPublish(t *testing.T) { require.NoError(t, err) require.Equal(t, expSignals, batch.Signals) }) + + t.Run("cancel the batch when a different server responds", func(t *testing.T) { + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + addr, stop := runServer(t, 5, nil, "localhost:0") + defer stop() + + cfg, err := config.NewConfigFrom(map[string]interface{}{ + "server": addr, + "timeout": 5, // 5 sec + "backoff": map[string]interface{}{ + "init": "10ms", + "max": "5s", + }, + }) + require.NoError(t, err) + + client := createShipperClient(t, cfg) + + // Should successfully publish without an ID change + batch := outest.NewBatch(events...) + err = client.Publish(ctx, batch) + require.NoError(t, err) + + // Replace the server (would change the ID) + stop() + + _, stop = runServer(t, 5, nil, addr) + defer stop() + + batch = outest.NewBatch(events...) + err = client.Publish(ctx, batch) + require.Error(t, err) + + require.Eventually(t, func() bool { + // the mock server does not validate incoming IDs on `Publish`, so the error should come from + // the acknowledgement request + return strings.Contains(err.Error(), "acknowledgement failed due to a connection to a different server") + }, 100*time.Millisecond, 10*time.Millisecond) + }) + } // BenchmarkToShipperEvent is used to detect performance regression when the conversion function is changed. @@ -485,6 +505,24 @@ func runServer(t *testing.T, qSize int, err error, listenAddr string) (actualAdd return actualAddr, stop } +func createShipperClient(t *testing.T, cfg *config.C) outputs.NetworkClient { + group, err := makeShipper( + nil, + beat.Info{Beat: "libbeat", IndexPrefix: "testbeat"}, + outputs.NewNilObserver(), + cfg, + ) + require.NoError(t, err) + require.Len(t, group.Clients, 1) + + client := group.Clients[0].(outputs.NetworkClient) + + err = client.Connect() + require.NoError(t, err) + + return client +} + func protoStruct(t *testing.T, values map[string]interface{}) *messages.Struct { s, err := helpers.NewStruct(values) require.NoError(t, err)