diff --git a/libbeat/outputs/shipper/README.md b/libbeat/outputs/shipper/README.md index bc1c9dd23f4..dec05449e73 100644 --- a/libbeat/outputs/shipper/README.md +++ b/libbeat/outputs/shipper/README.md @@ -19,6 +19,7 @@ output.shipper: timeout: 30 max_retries: 3 bulk_max_size: 50 + ack_polling_interval: '5ms' backoff: init: 1 max: 60 @@ -61,6 +62,12 @@ Setting `bulk_max_size` to values less than or equal to 0 disables the splitting of batches. When splitting is disabled, the queue decides on the number of events to be contained in a batch. +### `ack_polling_interval` + +The minimal interval for getting persisted index updates from the shipper server. Batches of events are acknowledged asynchronously in the background. If after the `ack_polling_interval` duration the persisted index value changed all batches pending acknowledgment will be checked against the new value and acknowledged if `persisted_index` >= `accepted_index`. + +The default value is `5ms`, cannot be set to a value less then the default. + ### `backoff.init` The number of seconds to wait before trying to republish to the shipper diff --git a/libbeat/outputs/shipper/api/shipper_mock.go b/libbeat/outputs/shipper/api/shipper_mock.go index 7b9c3534e5d..6b26f100e67 100644 --- a/libbeat/outputs/shipper/api/shipper_mock.go +++ b/libbeat/outputs/shipper/api/shipper_mock.go @@ -19,6 +19,8 @@ package api import ( context "context" + "errors" + "time" pb "github.com/elastic/elastic-agent-shipper-client/pkg/proto" "github.com/elastic/elastic-agent-shipper-client/pkg/proto/messages" @@ -29,16 +31,18 @@ import ( func NewProducerMock(cap int) *ProducerMock { id, _ := uuid.NewV4() return &ProducerMock{ - UUID: id.String(), + uuid: id.String(), Q: make([]*messages.Event, 0, cap), } } type ProducerMock struct { pb.UnimplementedProducerServer - Q []*messages.Event - UUID string - Error error + Q []*messages.Event + uuid string + AcceptedCount uint32 + persistedIndex uint64 + Error error } func (p *ProducerMock) PublishEvents(ctx context.Context, r *messages.PublishRequest) (*messages.PublishReply, error) { @@ -46,6 +50,10 @@ func (p *ProducerMock) PublishEvents(ctx context.Context, r *messages.PublishReq return nil, p.Error } + if r.Uuid != p.uuid { + return nil, errors.New("UUID does not match") + } + resp := &messages.PublishReply{} for _, e := range r.Events { @@ -55,6 +63,9 @@ func (p *ProducerMock) PublishEvents(ctx context.Context, r *messages.PublishReq p.Q = append(p.Q, e) resp.AcceptedCount++ + if resp.AcceptedCount == p.AcceptedCount { + break + } } resp.AcceptedIndex = uint64(len(p.Q)) @@ -62,9 +73,34 @@ func (p *ProducerMock) PublishEvents(ctx context.Context, r *messages.PublishReq return resp, nil } +func (p *ProducerMock) Persist(count uint64) { + p.persistedIndex = count +} + func (p *ProducerMock) PersistedIndex(req *messages.PersistedIndexRequest, producer pb.Producer_PersistedIndexServer) error { - return producer.Send(&messages.PersistedIndexReply{ - Uuid: p.UUID, - PersistedIndex: uint64(len(p.Q)), + err := producer.Send(&messages.PersistedIndexReply{ + Uuid: p.uuid, + PersistedIndex: p.persistedIndex, }) + if err != nil { + return err + } + + if !req.PollingInterval.IsValid() || req.PollingInterval.AsDuration() == 0 { + return nil + } + + ticker := time.NewTicker(req.PollingInterval.AsDuration()) + defer ticker.Stop() + + for range ticker.C { + err = producer.Send(&messages.PersistedIndexReply{ + Uuid: p.uuid, + PersistedIndex: p.persistedIndex, + }) + if err != nil { + return err + } + } + return nil } diff --git a/libbeat/outputs/shipper/config.go b/libbeat/outputs/shipper/config.go index 43bef1a63c0..a4c71e05da2 100644 --- a/libbeat/outputs/shipper/config.go +++ b/libbeat/outputs/shipper/config.go @@ -39,10 +39,11 @@ type Config struct { MaxRetries int `config:"max_retries" validate:"min=-1,nonzero"` // BulkMaxSize max amount of events in a single batch BulkMaxSize int `config:"bulk_max_size"` - // AckPollingInterval is an interval for polling the shipper server for persistence acknowledgement. - // The default/minimum 5 ms polling interval means we could publish at most 1000 ms / 5 ms = 200 batches/sec. - // With the default `bulk_max_size` of 50 events this would limit - // the default throughput per worker to 200 * 50 = 10000 events/sec. + // AckPollingInterval is a minimal interval for getting persisted index updates from the shipper server. + // Batches of events are acknowledged asynchronously in the background. + // If after the `AckPollingInterval` duration the persisted index value changed + // all batches pending acknowledgment will be checked against the new value + // and acknowledged if `persisted_index` >= `accepted_index`. AckPollingInterval time.Duration `config:"ack_polling_interval" validate:"min=5ms"` // Backoff strategy for the shipper output Backoff backoffConfig `config:"backoff"` diff --git a/libbeat/outputs/shipper/shipper.go b/libbeat/outputs/shipper/shipper.go index 9a9aab65857..6163b6fb8f9 100644 --- a/libbeat/outputs/shipper/shipper.go +++ b/libbeat/outputs/shipper/shipper.go @@ -20,7 +20,7 @@ package shipper import ( "context" "fmt" - "time" + "sync" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/outputs" @@ -43,14 +43,29 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" ) +type pendingBatch struct { + batch publisher.Batch + index uint64 + serverID string + droppedCount int +} + type shipper struct { log *logp.Logger observer outputs.Observer - conn *grpc.ClientConn - client sc.ProducerClient - timeout time.Duration + config Config serverID string + + pending []pendingBatch + pendingMutex sync.Mutex + + conn *grpc.ClientConn + client sc.ProducerClient + clientMutex sync.Mutex + + backgroundCtx context.Context + backgroundCancel func() } func init() { @@ -70,25 +85,29 @@ func makeShipper( return outputs.Fail(err) } - s := outputs.WithBackoff(&shipper{ + s := &shipper{ log: logp.NewLogger("shipper"), observer: observer, config: config, - timeout: config.Timeout, - }, config.Backoff.Init, config.Backoff.Max) + } + + // for `Close` function to stop all the background work like acknowledgment loop + s.backgroundCtx, s.backgroundCancel = context.WithCancel(context.Background()) + + swb := outputs.WithBackoff(s, config.Backoff.Init, config.Backoff.Max) - return outputs.Success(config.BulkMaxSize, config.MaxRetries, s) + return outputs.Success(config.BulkMaxSize, config.MaxRetries, swb) } // Connect establishes connection to the shipper server and implements `outputs.Connectable`. -func (c *shipper) Connect() error { - tls, err := tlscommon.LoadTLSConfig(c.config.TLS) +func (s *shipper) Connect() error { + tls, err := tlscommon.LoadTLSConfig(s.config.TLS) if err != nil { return fmt.Errorf("invalid shipper TLS configuration: %w", err) } var creds credentials.TransportCredentials - if c.config.TLS != nil && c.config.TLS.Enabled != nil && *c.config.TLS.Enabled { + if s.config.TLS != nil && s.config.TLS.Enabled != nil && *s.config.TLS.Enabled { creds = credentials.NewTLS(tls.ToConfig()) } else { creds = insecure.NewCredentials() @@ -96,158 +115,213 @@ func (c *shipper) Connect() error { opts := []grpc.DialOption{ grpc.WithConnectParams(grpc.ConnectParams{ - MinConnectTimeout: c.config.Timeout, + MinConnectTimeout: s.config.Timeout, }), grpc.WithBlock(), grpc.WithTransportCredentials(creds), } - ctx, cancel := context.WithTimeout(context.Background(), c.config.Timeout) + ctx, cancel := context.WithTimeout(context.Background(), s.config.Timeout) defer cancel() - c.log.Debugf("trying to connect to %s...", c.config.Server) + s.log.Debugf("trying to connect to %s...", s.config.Server) - conn, err := grpc.DialContext(ctx, c.config.Server, opts...) + conn, err := grpc.DialContext(ctx, s.config.Server, opts...) if err != nil { return fmt.Errorf("shipper connection failed with: %w", err) } - c.conn = conn - c.client = sc.NewProducerClient(conn) + s.conn = conn + s.clientMutex.Lock() + defer s.clientMutex.Unlock() + + s.client = sc.NewProducerClient(conn) + + // we don't need a timeout context here anymore, we use the + // `s.backgroundCtx` instead, it's going to be a long running client + ackCtx, ackCancel := context.WithCancel(s.backgroundCtx) + defer func() { + // in case we return an error before we start the `ackLoop` + // then we don't need this client anymore and must close the stream + if err != nil { + ackCancel() + } + }() - indexClient, err := c.client.PersistedIndex(ctx, &messages.PersistedIndexRequest{ - PollingInterval: durationpb.New(0), // no need to subscribe, we need it only once + indexClient, err := s.client.PersistedIndex(ackCtx, &messages.PersistedIndexRequest{ + PollingInterval: durationpb.New(s.config.AckPollingInterval), }) if err != nil { return fmt.Errorf("failed to connect to the server: %w", err) } - serverInfo, err := indexClient.Recv() + indexReply, err := indexClient.Recv() if err != nil { return fmt.Errorf("failed to fetch server information: %w", err) } - c.serverID = serverInfo.GetUuid() + s.serverID = indexReply.GetUuid() + + s.log.Debugf("connection to %s (%s) established.", s.config.Server, s.serverID) - c.log.Debugf("connection to %s (%s) established.", c.config.Server, c.serverID) + go func() { + defer ackCancel() + s.log.Debugf("starting acknowledgment loop with server %s", s.serverID) + // the loop returns only in case of error + err := s.ackLoop(s.backgroundCtx, indexClient) + s.log.Errorf("acknowledgment loop stopped: %s", err) + }() return nil } // Publish converts and sends a batch of events to the shipper server. // Also, implements `outputs.Client` -func (c *shipper) Publish(ctx context.Context, batch publisher.Batch) error { - if c.client == nil { +func (s *shipper) Publish(ctx context.Context, batch publisher.Batch) error { + if s.client == nil { return fmt.Errorf("connection is not established") } - st := c.observer + st := s.observer events := batch.Events() st.NewBatch(len(events)) - nonDroppedEvents := make([]publisher.Event, 0, len(events)) - convertedEvents := make([]*messages.Event, 0, len(events)) + toSend := make([]*messages.Event, 0, len(events)) - c.log.Debugf("converting %d events to protobuf...", len(events)) + s.log.Debugf("converting %d events to protobuf...", len(events)) - for i, e := range events { + droppedCount := 0 + for i, e := range events { converted, err := toShipperEvent(e) if err != nil { // conversion errors are not recoverable, so we have to drop the event completely - c.log.Errorf("%d/%d: %q, dropped", i+1, len(events), err) + s.log.Errorf("%d/%d: %q, dropped", i+1, len(events), err) + droppedCount++ continue } - convertedEvents = append(convertedEvents, converted) - nonDroppedEvents = append(nonDroppedEvents, e) + toSend = append(toSend, converted) } - droppedCount := len(events) - len(nonDroppedEvents) + convertedCount := len(toSend) st.Dropped(droppedCount) - c.log.Debugf("%d events converted to protobuf, %d dropped", len(nonDroppedEvents), droppedCount) + s.log.Debugf("%d events converted to protobuf, %d dropped", convertedCount, droppedCount) - ctx, cancel := context.WithTimeout(ctx, c.timeout) - defer cancel() - publishReply, err := c.client.PublishEvents(ctx, &messages.PublishRequest{ - Uuid: c.serverID, - Events: convertedEvents, - }) - - if status.Code(err) != codes.OK { - batch.Cancelled() // does not decrease the TTL - st.Cancelled(len(events)) // we cancel the whole batch not just non-dropped events - return fmt.Errorf("failed to publish the batch to the shipper, none of the %d events were accepted: %w", len(convertedEvents), err) - } + var lastAcceptedIndex uint64 - // with a correct server implementation should never happen, this error is not recoverable - if int(publishReply.AcceptedCount) > len(nonDroppedEvents) { - return fmt.Errorf( - "server returned unexpected results, expected maximum accepted items %d, got %d", - len(nonDroppedEvents), - publishReply.AcceptedCount, - ) - } - - // so we explicitly close the stream - ackCtx, cancel := context.WithCancel(ctx) + ctx, cancel := context.WithTimeout(ctx, s.config.Timeout) defer cancel() - ackClient, err := c.client.PersistedIndex(ackCtx, &messages.PersistedIndexRequest{ - PollingInterval: durationpb.New(c.config.AckPollingInterval), - }) - if err != nil { - return fmt.Errorf("acknowledgement failed due to the connectivity error: %w", err) - } - indexReply, err := ackClient.Recv() - if err != nil { - return fmt.Errorf("acknowledgement failed due to the connectivity error: %w", err) - } + for len(toSend) > 0 { + publishReply, err := s.client.PublishEvents(ctx, &messages.PublishRequest{ + Uuid: s.serverID, + Events: toSend, + }) - if indexReply.GetUuid() != c.serverID { - batch.Cancelled() - st.Cancelled(len(events)) - return fmt.Errorf("acknowledgement failed due to a connection to a different server %s, expected %s", indexReply.Uuid, c.serverID) - } + if status.Code(err) != codes.OK { + batch.Cancelled() // does not decrease the TTL + st.Cancelled(len(events)) // we cancel the whole batch not just non-dropped events + return fmt.Errorf("failed to publish the batch to the shipper, none of the %d events were accepted: %w", len(toSend), err) + } - for indexReply.PersistedIndex < publishReply.AcceptedIndex { - indexReply, err = ackClient.Recv() - if err != nil { - return fmt.Errorf("acknowledgement failed due to the connectivity error: %w", err) + // with a correct server implementation should never happen, this error is not recoverable + if int(publishReply.AcceptedCount) > len(toSend) { + return fmt.Errorf( + "server returned unexpected results, expected maximum accepted items %d, got %d", + len(toSend), + publishReply.AcceptedCount, + ) } + toSend = toSend[publishReply.AcceptedCount:] + lastAcceptedIndex = publishReply.AcceptedIndex + s.log.Debugf("%d events have been accepted during a publish request", len(toSend)) } - retries := nonDroppedEvents[publishReply.AcceptedCount:] - if len(retries) == 0 { - batch.ACK() - st.Acked(len(nonDroppedEvents)) - c.log.Debugf("%d events have been accepted, %d dropped", len(nonDroppedEvents), droppedCount) - } else { - batch.RetryEvents(retries) // decreases TTL unless guaranteed delivery - st.Failed(len(retries)) - c.log.Debugf("%d events have been accepted, %d sent for retry, %d dropped", publishReply.AcceptedCount, len(retries), droppedCount) - } + s.log.Debugf("total of %d events have been accepted from batch, %d dropped", convertedCount, droppedCount) + + s.pendingMutex.Lock() + s.pending = append(s.pending, pendingBatch{ + batch: batch, + index: lastAcceptedIndex, + serverID: s.serverID, + droppedCount: droppedCount, + }) + s.pendingMutex.Unlock() return nil } // Close closes the connection to the shipper server. // Also, implements `outputs.Client` -func (c *shipper) Close() error { - if c.client == nil { +func (s *shipper) Close() error { + if s.client == nil { return fmt.Errorf("connection is not established") } - err := c.conn.Close() - c.conn = nil - c.client = nil + s.backgroundCancel() + err := s.conn.Close() + s.conn = nil + s.client = nil + s.pending = nil return err } // String implements `outputs.Client` -func (c *shipper) String() string { +func (s *shipper) String() string { return "shipper" } +func (s *shipper) ackLoop(ctx context.Context, ackClient sc.Producer_PersistedIndexClient) error { + st := s.observer + + for { + select { + + case <-ctx.Done(): + return ctx.Err() + + default: + // this sends an update and unblocks only if the `PersistedIndex` value has changed + indexReply, err := ackClient.Recv() + if err != nil { + return fmt.Errorf("acknowledgment failed due to the connectivity error: %w", err) + } + + s.pendingMutex.Lock() + lastProcessed := 0 + for _, p := range s.pending { + if p.serverID != indexReply.Uuid { + s.log.Errorf("acknowledgment failed due to a connection to a different server %s, batch was accepted by %s", indexReply.Uuid, p.serverID) + p.batch.Cancelled() + st.Cancelled(len(p.batch.Events())) + lastProcessed++ + continue + } + + // if we met a batch that is ahead of the persisted index + // we stop iterating and wait for another update from the server. + // The latest pending batch has the max(AcceptedIndex). + if p.index > indexReply.PersistedIndex { + break + } + + p.batch.ACK() + ackedCount := len(p.batch.Events()) - p.droppedCount + st.Acked(ackedCount) + s.log.Debugf("%d events have been acknowledged, %d dropped", ackedCount, p.droppedCount) + lastProcessed++ + } + // so we don't perform any manipulation when the pending list is empty + // or none of the batches were acknowledged by this persisted index update + if lastProcessed != 0 { + copy(s.pending[0:], s.pending[lastProcessed:]) + s.pending = s.pending[lastProcessed:] + } + s.pendingMutex.Unlock() + } + } +} + func convertMapStr(m mapstr.M) (*messages.Value, error) { if m == nil { return helpers.NewNullValue(), nil diff --git a/libbeat/outputs/shipper/shipper_test.go b/libbeat/outputs/shipper/shipper_test.go index 9cdd869e971..af028f886a7 100644 --- a/libbeat/outputs/shipper/shipper_test.go +++ b/libbeat/outputs/shipper/shipper_test.go @@ -22,10 +22,12 @@ import ( "errors" "fmt" "net" + "reflect" "strings" "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/grpc" "google.golang.org/protobuf/proto" @@ -247,12 +249,13 @@ func TestPublish(t *testing.T) { } cases := []struct { - name string - events []beat.Event - expSignals []outest.BatchSignal - serverError error - expError string - qSize int + name string + events []beat.Event + expSignals []outest.BatchSignal + serverError error + expError string + qSize int + acceptedCount uint32 }{ { name: "sends a batch excluding dropped", @@ -269,11 +272,11 @@ func TestPublish(t *testing.T) { events: events, expSignals: []outest.BatchSignal{ { - Tag: outest.BatchRetryEvents, - Events: toPublisherEvents(events[2:]), + Tag: outest.BatchACK, }, }, - qSize: 1, + qSize: 2, + acceptedCount: 1, // we'll enforce 2 `PublishEvents` requests }, { name: "cancels the batch if server error", @@ -297,7 +300,7 @@ func TestPublish(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() - addr, stop := runServer(t, tc.qSize, tc.serverError, "localhost:0") + addr, producer, stop := runServer(t, tc.qSize, tc.serverError, "localhost:0") defer stop() cfg, err := config.NewConfigFrom(map[string]interface{}{ @@ -315,17 +318,23 @@ func TestPublish(t *testing.T) { require.Contains(t, err.Error(), tc.expError) } else { require.NoError(t, err) + producer.Persist(uint64(tc.qSize)) // always persisted all published events } + assert.Eventually(t, func() bool { + // there is a background routine that checks acknowledgments, + // it should eventually change the status of the batch + return reflect.DeepEqual(tc.expSignals, batch.Signals) + }, 100*time.Millisecond, 10*time.Millisecond) require.Equal(t, tc.expSignals, batch.Signals) }) } - t.Run("cancel the batch when the server is not available", func(t *testing.T) { + t.Run("cancels the batch when a different server responds", func(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() - addr, stop := runServer(t, 5, nil, "localhost:0") + addr, _, stop := runServer(t, 5, nil, "localhost:0") defer stop() cfg, err := config.NewConfigFrom(map[string]interface{}{ @@ -340,57 +349,37 @@ func TestPublish(t *testing.T) { client := createShipperClient(t, cfg) - // Should successfully publish with the server running + // Should accept the batch and put it to the pending list batch := outest.NewBatch(events...) err = client.Publish(ctx, batch) require.NoError(t, err) - expSignals := []outest.BatchSignal{ - { - Tag: outest.BatchACK, - }, - } - require.Equal(t, expSignals, batch.Signals) - - stop() // now stop the server and try sending again - batch = outest.NewBatch(events...) // resetting the batch signals - err = client.Publish(ctx, batch) - require.Error(t, err) - require.Contains(t, err.Error(), "failed to publish the batch to the shipper, none of the 2 events were accepted") - expSignals = []outest.BatchSignal{ - { - Tag: outest.BatchCancelled, // "cancelled" means there will be a retry without decreasing the TTL - }, - } - require.Equal(t, expSignals, batch.Signals) - client.Close() + // Replace the server (would change the ID) + stop() - // Start the server again - _, stop = runServer(t, 5, nil, addr) + _, _, stop = runServer(t, 5, nil, addr) defer stop() + err = client.Connect() + require.NoError(t, err) - batch = outest.NewBatch(events...) // resetting the signals - expSignals = []outest.BatchSignal{ + expSignals := []outest.BatchSignal{ { - Tag: outest.BatchACK, + Tag: outest.BatchCancelled, }, } - - // The backoff wrapper should take care of the errors and - // retries while the server is still starting - err = client.Connect() - require.NoError(t, err) - - err = client.Publish(ctx, batch) - require.NoError(t, err) + assert.Eventually(t, func() bool { + // there is a background routine that checks acknowledgments, + // it should eventually cancel the batch because the IDs don't match + return reflect.DeepEqual(expSignals, batch.Signals) + }, 100*time.Millisecond, 10*time.Millisecond) require.Equal(t, expSignals, batch.Signals) }) - t.Run("cancel the batch when a different server responds", func(t *testing.T) { + t.Run("acks multiple batches", func(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() - addr, stop := runServer(t, 5, nil, "localhost:0") + addr, producer, stop := runServer(t, 6, nil, "localhost:0") defer stop() cfg, err := config.NewConfigFrom(map[string]interface{}{ @@ -405,28 +394,38 @@ func TestPublish(t *testing.T) { client := createShipperClient(t, cfg) - // Should successfully publish without an ID change - batch := outest.NewBatch(events...) - err = client.Publish(ctx, batch) + // Should accept the batch and put it to the pending list + batch1 := outest.NewBatch(events...) + err = client.Publish(ctx, batch1) require.NoError(t, err) - // Replace the server (would change the ID) - stop() + batch2 := outest.NewBatch(events...) + err = client.Publish(ctx, batch2) + require.NoError(t, err) - _, stop = runServer(t, 5, nil, addr) - defer stop() + batch3 := outest.NewBatch(events...) + err = client.Publish(ctx, batch3) + require.NoError(t, err) - batch = outest.NewBatch(events...) - err = client.Publish(ctx, batch) - require.Error(t, err) + expSignals := []outest.BatchSignal{ + { + Tag: outest.BatchACK, + }, + } - require.Eventually(t, func() bool { - // the mock server does not validate incoming IDs on `Publish`, so the error should come from - // the acknowledgement request - return strings.Contains(err.Error(), "acknowledgement failed due to a connection to a different server") + producer.Persist(6) // 2 events per batch, 3 batches + + assert.Eventually(t, func() bool { + // there is a background routine that checks acknowledgments, + // it should eventually send expected signals + return reflect.DeepEqual(expSignals, batch1.Signals) && + reflect.DeepEqual(expSignals, batch2.Signals) && + reflect.DeepEqual(expSignals, batch3.Signals) }, 100*time.Millisecond, 10*time.Millisecond) + require.Equal(t, expSignals, batch1.Signals, "batch1") + require.Equal(t, expSignals, batch2.Signals, "batch2") + require.Equal(t, expSignals, batch3.Signals, "batch3") }) - } // BenchmarkToShipperEvent is used to detect performance regression when the conversion function is changed. @@ -484,7 +483,7 @@ func BenchmarkToShipperEvent(b *testing.B) { // `err` is a preset error that the server will serve to the client // `listenAddr` is the address for the server to listen // returns `actualAddr` where the listener actually is and the `stop` function to stop the server -func runServer(t *testing.T, qSize int, err error, listenAddr string) (actualAddr string, stop func()) { +func runServer(t *testing.T, qSize int, err error, listenAddr string) (actualAddr string, mock *api.ProducerMock, stop func()) { producer := api.NewProducerMock(qSize) producer.Error = err grpcServer := grpc.NewServer() @@ -502,7 +501,7 @@ func runServer(t *testing.T, qSize int, err error, listenAddr string) (actualAdd listener.Close() } - return actualAddr, stop + return actualAddr, producer, stop } func createShipperClient(t *testing.T, cfg *config.C) outputs.NetworkClient { @@ -540,11 +539,3 @@ func requireEqualProto(t *testing.T, expected, actual proto.Message) { fmt.Sprintf("These two protobuf messages are not equal:\nexpected: %v\nactual: %v", expected, actual), ) } - -func toPublisherEvents(events []beat.Event) []publisher.Event { - converted := make([]publisher.Event, 0, len(events)) - for _, e := range events { - converted = append(converted, publisher.Event{Content: e}) - } - return converted -}