Skip to content

Commit

Permalink
Add persistence acknowledgement support for shipper (#32708)
Browse files Browse the repository at this point in the history
Now the shipper output is able to receive acknowledgement of the
persisted events from the shipper server.

Also, added the UUID verification in case the client is reconnected to
a different server.
  • Loading branch information
rdner authored Aug 23, 2022
1 parent e4f8c43 commit f992631
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 47 deletions.
4 changes: 2 additions & 2 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10069,11 +10069,11 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-l

--------------------------------------------------------------------------------
Dependency : github.com/elastic/elastic-agent-shipper-client
Version: v0.2.0
Version: v0.4.0
Licence type (autodetected): Elastic
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-shipper-client@v0.2.0/LICENSE.txt:
Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-shipper-client@v0.4.0/LICENSE.txt:

Elastic License 2.0

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ require (
github.com/elastic/bayeux v1.0.5
github.com/elastic/elastic-agent-autodiscover v0.2.1
github.com/elastic/elastic-agent-libs v0.2.11
github.com/elastic/elastic-agent-shipper-client v0.2.0
github.com/elastic/elastic-agent-shipper-client v0.4.0
github.com/elastic/elastic-agent-system-metrics v0.4.4
github.com/elastic/go-elasticsearch/v8 v8.2.0
github.com/pierrec/lz4/v4 v4.1.15
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -603,8 +603,8 @@ github.com/elastic/elastic-agent-libs v0.2.5/go.mod h1:chO3rtcLyGlKi9S0iGVZhYCzD
github.com/elastic/elastic-agent-libs v0.2.7/go.mod h1:chO3rtcLyGlKi9S0iGVZhYCzDfdDsAQYBc+ui588AFE=
github.com/elastic/elastic-agent-libs v0.2.11 h1:ZeYn35Kxt+IdtMPmE01TaDeaahCg/z7MkGPVWUo6Lp4=
github.com/elastic/elastic-agent-libs v0.2.11/go.mod h1:chO3rtcLyGlKi9S0iGVZhYCzDfdDsAQYBc+ui588AFE=
github.com/elastic/elastic-agent-shipper-client v0.2.0 h1:p+5ep48YCOe+3nICeWmiLwQV11yDLad2n4NunI66Shg=
github.com/elastic/elastic-agent-shipper-client v0.2.0/go.mod h1:OyI2W+Mv3JxlkEF3OeT7K0dbuxvwew8ke2Cf4HpLa9Q=
github.com/elastic/elastic-agent-shipper-client v0.4.0 h1:nsTJF9oo4RHLl+zxFUZqNHaE86C6Ba5aImfegcEf6Sk=
github.com/elastic/elastic-agent-shipper-client v0.4.0/go.mod h1:OyI2W+Mv3JxlkEF3OeT7K0dbuxvwew8ke2Cf4HpLa9Q=
github.com/elastic/elastic-agent-system-metrics v0.4.4 h1:Br3S+TlBhijrLysOvbHscFhgQ00X/trDT5VEnOau0E0=
github.com/elastic/elastic-agent-system-metrics v0.4.4/go.mod h1:tF/f9Off38nfzTZHIVQ++FkXrDm9keFhFpJ+3pQ00iI=
github.com/elastic/elastic-transport-go/v8 v8.1.0 h1:NeqEz1ty4RQz+TVbUrpSU7pZ48XkzGWQj02k5koahIE=
Expand Down
16 changes: 15 additions & 1 deletion libbeat/outputs/shipper/api/shipper_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,22 @@ import (

pb "github.com/elastic/elastic-agent-shipper-client/pkg/proto"
"github.com/elastic/elastic-agent-shipper-client/pkg/proto/messages"

"github.com/gofrs/uuid"
)

func NewProducerMock(cap int) *ProducerMock {
id, _ := uuid.NewV4()
return &ProducerMock{
Q: make([]*messages.Event, 0, cap),
UUID: id.String(),
Q: make([]*messages.Event, 0, cap),
}
}

type ProducerMock struct {
pb.UnimplementedProducerServer
Q []*messages.Event
UUID string
Error error
}

Expand All @@ -52,5 +57,14 @@ func (p *ProducerMock) PublishEvents(ctx context.Context, r *messages.PublishReq
resp.AcceptedCount++
}

resp.AcceptedIndex = uint64(len(p.Q))

return resp, nil
}

func (p *ProducerMock) PersistedIndex(req *messages.PersistedIndexRequest, producer pb.Producer_PersistedIndexServer) error {
return producer.Send(&messages.PersistedIndexReply{
Uuid: p.UUID,
PersistedIndex: uint64(len(p.Q)),
})
}
18 changes: 12 additions & 6 deletions libbeat/outputs/shipper/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,27 @@ type Config struct {
// TLS/SSL configurationf or secure connection
TLS *tlscommon.Config `config:"ssl"`
// Timeout of a single batch publishing request
Timeout time.Duration `config:"timeout" validate:"min=1"`
Timeout time.Duration `config:"timeout" validate:"min=1"`
// MaxRetries is how many times the same batch is attempted to be sent
MaxRetries int `config:"max_retries" validate:"min=-1,nonzero"`
MaxRetries int `config:"max_retries" validate:"min=-1,nonzero"`
// BulkMaxSize max amount of events in a single batch
BulkMaxSize int `config:"bulk_max_size"`
// AckPollingInterval is an interval for polling the shipper server for persistence acknowledgement.
// The default/minimum 5 ms polling interval means we could publish at most 1000 ms / 5 ms = 200 batches/sec.
// With the default `bulk_max_size` of 50 events this would limit
// the default throughput per worker to 200 * 50 = 10000 events/sec.
AckPollingInterval time.Duration `config:"ack_polling_interval" validate:"min=5ms"`
// Backoff strategy for the shipper output
Backoff backoffConfig `config:"backoff"`
}

func defaultConfig() Config {
return Config{
TLS: nil,
Timeout: 30 * time.Second,
MaxRetries: 3,
BulkMaxSize: 50,
TLS: nil,
Timeout: 30 * time.Second,
MaxRetries: 3,
BulkMaxSize: 50,
AckPollingInterval: 5 * time.Millisecond,
Backoff: backoffConfig{
Init: 1 * time.Second,
Max: 60 * time.Second,
Expand Down
61 changes: 51 additions & 10 deletions libbeat/outputs/shipper/shipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
)

Expand All @@ -49,6 +50,7 @@ type shipper struct {
client sc.ProducerClient
timeout time.Duration
config Config
serverID string
}

func init() {
Expand Down Expand Up @@ -109,11 +111,24 @@ func (c *shipper) Connect() error {
if err != nil {
return fmt.Errorf("shipper connection failed with: %w", err)
}
c.log.Debugf("connect to %s established.", c.config.Server)

c.conn = conn
c.client = sc.NewProducerClient(conn)

indexClient, err := c.client.PersistedIndex(ctx, &messages.PersistedIndexRequest{
PollingInterval: durationpb.New(0), // no need to subscribe, we need it only once
})
if err != nil {
return fmt.Errorf("failed to connect to the server: %w", err)
}
serverInfo, err := indexClient.Recv()
if err != nil {
return fmt.Errorf("failed to fetch server information: %w", err)
}
c.serverID = serverInfo.GetUuid()

c.log.Debugf("connection to %s (%s) established.", c.config.Server, c.serverID)

return nil
}

Expand Down Expand Up @@ -153,37 +168,63 @@ func (c *shipper) Publish(ctx context.Context, batch publisher.Batch) error {

ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()
resp, err := c.client.PublishEvents(ctx, &messages.PublishRequest{
publishReply, err := c.client.PublishEvents(ctx, &messages.PublishRequest{
Uuid: c.serverID,
Events: convertedEvents,
})

if status.Code(err) != codes.OK || resp == nil {
if status.Code(err) != codes.OK {
batch.Cancelled() // does not decrease the TTL
st.Cancelled(len(events)) // we cancel the whole batch not just non-dropped events
return fmt.Errorf("failed to publish the batch to the shipper, none of the %d events were accepted: %w", len(convertedEvents), err)
}

// with a correct server implementation should never happen, this error is not recoverable
if int(resp.AcceptedCount) > len(nonDroppedEvents) {
if int(publishReply.AcceptedCount) > len(nonDroppedEvents) {
return fmt.Errorf(
"server returned unexpected results, expected maximum accepted items %d, got %d",
len(nonDroppedEvents),
resp.AcceptedCount,
publishReply.AcceptedCount,
)
}

// the server is supposed to retain the order of the initial events in the response
// judging by the size of the result list we can determine what part of the initial
// list was accepted and we can send the rest of the list for a retry
retries := nonDroppedEvents[resp.AcceptedCount:]
// so we explicitly close the stream
ackCtx, cancel := context.WithCancel(ctx)
defer cancel()
ackClient, err := c.client.PersistedIndex(ackCtx, &messages.PersistedIndexRequest{
PollingInterval: durationpb.New(c.config.AckPollingInterval),
})
if err != nil {
return fmt.Errorf("acknowledgement failed due to the connectivity error: %w", err)
}

indexReply, err := ackClient.Recv()
if err != nil {
return fmt.Errorf("acknowledgement failed due to the connectivity error: %w", err)
}

if indexReply.GetUuid() != c.serverID {
batch.Cancelled()
st.Cancelled(len(events))
return fmt.Errorf("acknowledgement failed due to a connection to a different server %s, expected %s", indexReply.Uuid, c.serverID)
}

for indexReply.PersistedIndex < publishReply.AcceptedIndex {
indexReply, err = ackClient.Recv()
if err != nil {
return fmt.Errorf("acknowledgement failed due to the connectivity error: %w", err)
}
}

retries := nonDroppedEvents[publishReply.AcceptedCount:]
if len(retries) == 0 {
batch.ACK()
st.Acked(len(nonDroppedEvents))
c.log.Debugf("%d events have been accepted, %d dropped", len(nonDroppedEvents), droppedCount)
} else {
batch.RetryEvents(retries) // decreases TTL unless guaranteed delivery
st.Failed(len(retries))
c.log.Debugf("%d events have been accepted, %d sent for retry, %d dropped", resp.AcceptedCount, len(retries), droppedCount)
c.log.Debugf("%d events have been accepted, %d sent for retry, %d dropped", publishReply.AcceptedCount, len(retries), droppedCount)
}

return nil
Expand Down
88 changes: 63 additions & 25 deletions libbeat/outputs/shipper/shipper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,21 +305,11 @@ func TestPublish(t *testing.T) {
})
require.NoError(t, err)

group, err := makeShipper(
nil,
beat.Info{Beat: "libbeat", IndexPrefix: "testbeat"},
outputs.NewNilObserver(),
cfg,
)
require.NoError(t, err)
require.Len(t, group.Clients, 1)
client := createShipperClient(t, cfg)

batch := outest.NewBatch(tc.events...)

err = group.Clients[0].(outputs.Connectable).Connect()
require.NoError(t, err)

err = group.Clients[0].Publish(ctx, batch)
err = client.Publish(ctx, batch)
if tc.expError != "" {
require.Error(t, err)
require.Contains(t, err.Error(), tc.expError)
Expand Down Expand Up @@ -348,19 +338,7 @@ func TestPublish(t *testing.T) {
})
require.NoError(t, err)

group, err := makeShipper(
nil,
beat.Info{Beat: "libbeat", IndexPrefix: "testbeat"},
outputs.NewNilObserver(),
cfg,
)
require.NoError(t, err)
require.Len(t, group.Clients, 1)

client := group.Clients[0].(outputs.NetworkClient)

err = client.Connect()
require.NoError(t, err)
client := createShipperClient(t, cfg)

// Should successfully publish with the server running
batch := outest.NewBatch(events...)
Expand Down Expand Up @@ -407,6 +385,48 @@ func TestPublish(t *testing.T) {
require.NoError(t, err)
require.Equal(t, expSignals, batch.Signals)
})

t.Run("cancel the batch when a different server responds", func(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

addr, stop := runServer(t, 5, nil, "localhost:0")
defer stop()

cfg, err := config.NewConfigFrom(map[string]interface{}{
"server": addr,
"timeout": 5, // 5 sec
"backoff": map[string]interface{}{
"init": "10ms",
"max": "5s",
},
})
require.NoError(t, err)

client := createShipperClient(t, cfg)

// Should successfully publish without an ID change
batch := outest.NewBatch(events...)
err = client.Publish(ctx, batch)
require.NoError(t, err)

// Replace the server (would change the ID)
stop()

_, stop = runServer(t, 5, nil, addr)
defer stop()

batch = outest.NewBatch(events...)
err = client.Publish(ctx, batch)
require.Error(t, err)

require.Eventually(t, func() bool {
// the mock server does not validate incoming IDs on `Publish`, so the error should come from
// the acknowledgement request
return strings.Contains(err.Error(), "acknowledgement failed due to a connection to a different server")
}, 100*time.Millisecond, 10*time.Millisecond)
})

}

// BenchmarkToShipperEvent is used to detect performance regression when the conversion function is changed.
Expand Down Expand Up @@ -485,6 +505,24 @@ func runServer(t *testing.T, qSize int, err error, listenAddr string) (actualAdd
return actualAddr, stop
}

func createShipperClient(t *testing.T, cfg *config.C) outputs.NetworkClient {
group, err := makeShipper(
nil,
beat.Info{Beat: "libbeat", IndexPrefix: "testbeat"},
outputs.NewNilObserver(),
cfg,
)
require.NoError(t, err)
require.Len(t, group.Clients, 1)

client := group.Clients[0].(outputs.NetworkClient)

err = client.Connect()
require.NoError(t, err)

return client
}

func protoStruct(t *testing.T, values map[string]interface{}) *messages.Struct {
s, err := helpers.NewStruct(values)
require.NoError(t, err)
Expand Down

0 comments on commit f992631

Please sign in to comment.