Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add persistence acknowledgement support for shipper #32708

Merged
merged 7 commits into from
Aug 23, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10069,11 +10069,11 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-l

--------------------------------------------------------------------------------
Dependency : github.com/elastic/elastic-agent-shipper-client
Version: v0.2.0
Version: v0.4.0
Licence type (autodetected): Elastic
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-shipper-client@v0.2.0/LICENSE.txt:
Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-shipper-client@v0.4.0/LICENSE.txt:

Elastic License 2.0

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ require (
github.com/elastic/bayeux v1.0.5
github.com/elastic/elastic-agent-autodiscover v0.2.1
github.com/elastic/elastic-agent-libs v0.2.11
github.com/elastic/elastic-agent-shipper-client v0.2.0
github.com/elastic/elastic-agent-shipper-client v0.4.0
github.com/elastic/elastic-agent-system-metrics v0.4.4
github.com/elastic/go-elasticsearch/v8 v8.2.0
github.com/pierrec/lz4/v4 v4.1.15
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -603,8 +603,8 @@ github.com/elastic/elastic-agent-libs v0.2.5/go.mod h1:chO3rtcLyGlKi9S0iGVZhYCzD
github.com/elastic/elastic-agent-libs v0.2.7/go.mod h1:chO3rtcLyGlKi9S0iGVZhYCzDfdDsAQYBc+ui588AFE=
github.com/elastic/elastic-agent-libs v0.2.11 h1:ZeYn35Kxt+IdtMPmE01TaDeaahCg/z7MkGPVWUo6Lp4=
github.com/elastic/elastic-agent-libs v0.2.11/go.mod h1:chO3rtcLyGlKi9S0iGVZhYCzDfdDsAQYBc+ui588AFE=
github.com/elastic/elastic-agent-shipper-client v0.2.0 h1:p+5ep48YCOe+3nICeWmiLwQV11yDLad2n4NunI66Shg=
github.com/elastic/elastic-agent-shipper-client v0.2.0/go.mod h1:OyI2W+Mv3JxlkEF3OeT7K0dbuxvwew8ke2Cf4HpLa9Q=
github.com/elastic/elastic-agent-shipper-client v0.4.0 h1:nsTJF9oo4RHLl+zxFUZqNHaE86C6Ba5aImfegcEf6Sk=
github.com/elastic/elastic-agent-shipper-client v0.4.0/go.mod h1:OyI2W+Mv3JxlkEF3OeT7K0dbuxvwew8ke2Cf4HpLa9Q=
github.com/elastic/elastic-agent-system-metrics v0.4.4 h1:Br3S+TlBhijrLysOvbHscFhgQ00X/trDT5VEnOau0E0=
github.com/elastic/elastic-agent-system-metrics v0.4.4/go.mod h1:tF/f9Off38nfzTZHIVQ++FkXrDm9keFhFpJ+3pQ00iI=
github.com/elastic/elastic-transport-go/v8 v8.1.0 h1:NeqEz1ty4RQz+TVbUrpSU7pZ48XkzGWQj02k5koahIE=
Expand Down
16 changes: 15 additions & 1 deletion libbeat/outputs/shipper/api/shipper_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,22 @@ import (

pb "github.com/elastic/elastic-agent-shipper-client/pkg/proto"
"github.com/elastic/elastic-agent-shipper-client/pkg/proto/messages"

"github.com/gofrs/uuid"
)

func NewProducerMock(cap int) *ProducerMock {
id, _ := uuid.NewV4()
return &ProducerMock{
Q: make([]*messages.Event, 0, cap),
UUID: id.String(),
Q: make([]*messages.Event, 0, cap),
}
}

type ProducerMock struct {
pb.UnimplementedProducerServer
Q []*messages.Event
UUID string
Error error
}

Expand All @@ -52,5 +57,14 @@ func (p *ProducerMock) PublishEvents(ctx context.Context, r *messages.PublishReq
resp.AcceptedCount++
}

resp.AcceptedIndex = uint64(len(p.Q))

return resp, nil
}

func (p *ProducerMock) PersistedIndex(req *messages.PersistedIndexRequest, producer pb.Producer_PersistedIndexServer) error {
return producer.Send(&messages.PersistedIndexReply{
Uuid: p.UUID,
PersistedIndex: uint64(len(p.Q)),
})
}
18 changes: 12 additions & 6 deletions libbeat/outputs/shipper/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,27 @@ type Config struct {
// TLS/SSL configurationf or secure connection
TLS *tlscommon.Config `config:"ssl"`
// Timeout of a single batch publishing request
Timeout time.Duration `config:"timeout" validate:"min=1"`
Timeout time.Duration `config:"timeout" validate:"min=1"`
// MaxRetries is how many times the same batch is attempted to be sent
MaxRetries int `config:"max_retries" validate:"min=-1,nonzero"`
MaxRetries int `config:"max_retries" validate:"min=-1,nonzero"`
// BulkMaxSize max amount of events in a single batch
BulkMaxSize int `config:"bulk_max_size"`
// AckPollingInterval is an interval for polling the shipper server for persistence acknowledgement.
// The default/minimum 5 ms polling interval means we could publish at most 1000 ms / 5 ms = 200 batches/sec.
// With the default `bulk_max_size` of 50 events this would limit
// the default throughput per worker to 200 * 50 = 10000 events/sec.
AckPollingInterval time.Duration `config:"ack_polling_interval" validate:"min=5ms"`
// Backoff strategy for the shipper output
Backoff backoffConfig `config:"backoff"`
}

func defaultConfig() Config {
return Config{
TLS: nil,
Timeout: 30 * time.Second,
MaxRetries: 3,
BulkMaxSize: 50,
TLS: nil,
Timeout: 30 * time.Second,
MaxRetries: 3,
BulkMaxSize: 50,
AckPollingInterval: 5 * time.Millisecond,
Backoff: backoffConfig{
Init: 1 * time.Second,
Max: 60 * time.Second,
Expand Down
67 changes: 57 additions & 10 deletions libbeat/outputs/shipper/shipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
)

Expand All @@ -49,6 +50,7 @@ type shipper struct {
client sc.ProducerClient
timeout time.Duration
config Config
serverID string
}

func init() {
Expand Down Expand Up @@ -109,11 +111,28 @@ func (c *shipper) Connect() error {
if err != nil {
return fmt.Errorf("shipper connection failed with: %w", err)
}
c.log.Debugf("connect to %s established.", c.config.Server)

c.conn = conn
c.client = sc.NewProducerClient(conn)

indexClient, err := c.client.PersistedIndex(ctx, &messages.PersistedIndexRequest{
PollingInterval: durationpb.New(0), // no need to subscribe, we need it only once
})
if err != nil {
return fmt.Errorf("failed to connect to the server: %w", err)
}
serverInfo, err := indexClient.Recv()
if err != nil {
return fmt.Errorf("failed to fetch server information: %w", err)
}
c.serverID = serverInfo.GetUuid()
cmacknz marked this conversation as resolved.
Show resolved Hide resolved
err = indexClient.CloseSend()
if err != nil {
c.log.Warnf("failed to close send stream when fetching server info: %s", err)
}

c.log.Debugf("connection to %s (%s) established.", c.config.Server, c.serverID)

return nil
}

Expand Down Expand Up @@ -153,37 +172,65 @@ func (c *shipper) Publish(ctx context.Context, batch publisher.Batch) error {

ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()
resp, err := c.client.PublishEvents(ctx, &messages.PublishRequest{
publishReply, err := c.client.PublishEvents(ctx, &messages.PublishRequest{
Uuid: c.serverID,
Events: convertedEvents,
})

if status.Code(err) != codes.OK || resp == nil {
if status.Code(err) != codes.OK {
batch.Cancelled() // does not decrease the TTL
st.Cancelled(len(events)) // we cancel the whole batch not just non-dropped events
return fmt.Errorf("failed to publish the batch to the shipper, none of the %d events were accepted: %w", len(convertedEvents), err)
}

// with a correct server implementation should never happen, this error is not recoverable
if int(resp.AcceptedCount) > len(nonDroppedEvents) {
if int(publishReply.AcceptedCount) > len(nonDroppedEvents) {
return fmt.Errorf(
"server returned unexpected results, expected maximum accepted items %d, got %d",
len(nonDroppedEvents),
resp.AcceptedCount,
publishReply.AcceptedCount,
)
}

// the server is supposed to retain the order of the initial events in the response
// judging by the size of the result list we can determine what part of the initial
// list was accepted and we can send the rest of the list for a retry
retries := nonDroppedEvents[resp.AcceptedCount:]
ackClient, err := c.client.PersistedIndex(ctx, &messages.PersistedIndexRequest{
PollingInterval: durationpb.New(c.config.AckPollingInterval),
})
if err != nil {
return fmt.Errorf("acknowledgement failed due to the connectivity error: %w", err)
}

for {
indexReply, err := ackClient.Recv()
if err != nil {
return fmt.Errorf("acknowledgement failed due to the connectivity error: %w", err)
rdner marked this conversation as resolved.
Show resolved Hide resolved
}

if indexReply.GetUuid() != c.serverID {
batch.Cancelled()
st.Cancelled(len(events))
err := fmt.Errorf("acknowledgement failed due to a connection to a different server %s, expected %s", indexReply.Uuid, c.serverID)
c.serverID = indexReply.GetUuid()
faec marked this conversation as resolved.
Show resolved Hide resolved
return err
}

if indexReply.PersistedIndex >= publishReply.AcceptedIndex {
err = ackClient.CloseSend()
if err != nil {
c.log.Debugf("failed to close send stream after receiving acknowledgement: %s", err)
}
break
}
}

retries := nonDroppedEvents[publishReply.AcceptedCount:]
if len(retries) == 0 {
batch.ACK()
st.Acked(len(nonDroppedEvents))
c.log.Debugf("%d events have been accepted, %d dropped", len(nonDroppedEvents), droppedCount)
} else {
batch.RetryEvents(retries) // decreases TTL unless guaranteed delivery
st.Failed(len(retries))
c.log.Debugf("%d events have been accepted, %d sent for retry, %d dropped", resp.AcceptedCount, len(retries), droppedCount)
c.log.Debugf("%d events have been accepted, %d sent for retry, %d dropped", publishReply.AcceptedCount, len(retries), droppedCount)
}

return nil
Expand Down
88 changes: 63 additions & 25 deletions libbeat/outputs/shipper/shipper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,21 +305,11 @@ func TestPublish(t *testing.T) {
})
require.NoError(t, err)

group, err := makeShipper(
nil,
beat.Info{Beat: "libbeat", IndexPrefix: "testbeat"},
outputs.NewNilObserver(),
cfg,
)
require.NoError(t, err)
require.Len(t, group.Clients, 1)
client := createShipperClient(t, cfg)

batch := outest.NewBatch(tc.events...)

err = group.Clients[0].(outputs.Connectable).Connect()
require.NoError(t, err)

err = group.Clients[0].Publish(ctx, batch)
err = client.Publish(ctx, batch)
if tc.expError != "" {
require.Error(t, err)
require.Contains(t, err.Error(), tc.expError)
Expand Down Expand Up @@ -348,19 +338,7 @@ func TestPublish(t *testing.T) {
})
require.NoError(t, err)

group, err := makeShipper(
nil,
beat.Info{Beat: "libbeat", IndexPrefix: "testbeat"},
outputs.NewNilObserver(),
cfg,
)
require.NoError(t, err)
require.Len(t, group.Clients, 1)

client := group.Clients[0].(outputs.NetworkClient)

err = client.Connect()
require.NoError(t, err)
client := createShipperClient(t, cfg)

// Should successfully publish with the server running
batch := outest.NewBatch(events...)
Expand Down Expand Up @@ -407,6 +385,48 @@ func TestPublish(t *testing.T) {
require.NoError(t, err)
require.Equal(t, expSignals, batch.Signals)
})

t.Run("cancel the batch when a different server responds", func(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

addr, stop := runServer(t, 5, nil, "localhost:0")
defer stop()

cfg, err := config.NewConfigFrom(map[string]interface{}{
"server": addr,
"timeout": 5, // 5 sec
"backoff": map[string]interface{}{
"init": "10ms",
"max": "5s",
},
})
require.NoError(t, err)

client := createShipperClient(t, cfg)

// Should successfully publish without an ID change
batch := outest.NewBatch(events...)
err = client.Publish(ctx, batch)
require.NoError(t, err)

// Replace the server (would change the ID)
stop()

_, stop = runServer(t, 5, nil, addr)
defer stop()

batch = outest.NewBatch(events...)
err = client.Publish(ctx, batch)
require.Error(t, err)

require.Eventually(t, func() bool {
// the mock server does not validate incoming IDs on `Publish`, so the error should come from
// the acknowledgement request
return strings.Contains(err.Error(), "acknowledgement failed due to a connection to a different server")
}, 100*time.Millisecond, 10*time.Millisecond)
})

}

// BenchmarkToShipperEvent is used to detect performance regression when the conversion function is changed.
Expand Down Expand Up @@ -485,6 +505,24 @@ func runServer(t *testing.T, qSize int, err error, listenAddr string) (actualAdd
return actualAddr, stop
}

func createShipperClient(t *testing.T, cfg *config.C) outputs.NetworkClient {
group, err := makeShipper(
nil,
beat.Info{Beat: "libbeat", IndexPrefix: "testbeat"},
outputs.NewNilObserver(),
cfg,
)
require.NoError(t, err)
require.Len(t, group.Clients, 1)

client := group.Clients[0].(outputs.NetworkClient)

err = client.Connect()
require.NoError(t, err)

return client
}

func protoStruct(t *testing.T, values map[string]interface{}) *messages.Struct {
s, err := helpers.NewStruct(values)
require.NoError(t, err)
Expand Down