Skip to content

Commit

Permalink
adding additional telemetry to logs sender (#11514)
Browse files Browse the repository at this point in the history
* adding additional telemetry

* formatting
  • Loading branch information
gh123man authored Apr 1, 2022
1 parent de56f0c commit 116fe13
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 6 deletions.
7 changes: 5 additions & 2 deletions pkg/logs/sender/destination_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,13 @@ func (d *DestinationSender) Send(payload *message.Payload) bool {
return false
}

// NonBlockingSend tries to send the payload and fails silently if the input is full
func (d *DestinationSender) NonBlockingSend(payload *message.Payload) {
// NonBlockingSend tries to send the payload and fails silently if the input is full.
// returns false if the buffer is full - true if successful.
func (d *DestinationSender) NonBlockingSend(payload *message.Payload) bool {
select {
case d.input <- payload:
return true
default:
}
return false
}
26 changes: 22 additions & 4 deletions pkg/logs/sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,18 @@
package sender

import (
"strconv"
"time"

"github.com/DataDog/datadog-agent/pkg/logs/client"
"github.com/DataDog/datadog-agent/pkg/logs/message"
"github.com/DataDog/datadog-agent/pkg/telemetry"
)

var (
tlmPayloadsDropped = telemetry.NewCounter("logs_sender", "payloads_dropped", []string{"reliable", "destination"}, "Payloads dropped")
tlmMessagesDropped = telemetry.NewCounter("logs_sender", "messages_dropped", []string{"reliable", "destination"}, "Messages dropped")
tlmSendWaitTime = telemetry.NewCounter("logs_sender", "send_wait", []string{}, "Time spent waiting for all sends to finish")
)

// Sender sends logs to different destinations. Destinations can be either
Expand Down Expand Up @@ -57,6 +65,7 @@ func (s *Sender) run() {
unreliableDestinations := buildDestinationSenders(s.destinations.Unreliable, sink, s.bufferSize)

for payload := range s.inputChan {
var startInUse = time.Now()

sent := false
for !sent {
Expand All @@ -74,18 +83,27 @@ func (s *Sender) run() {
}
}

for _, destSender := range reliableDestinations {
for i, destSender := range reliableDestinations {
// If an endpoint is stuck in the previous step, try to buffer the payloads if we have room to mitigate
// loss on intermittent failures.
if !destSender.lastSendSucceeded {
destSender.NonBlockingSend(payload)
if !destSender.NonBlockingSend(payload) {
tlmPayloadsDropped.Inc("true", strconv.Itoa(i))
tlmMessagesDropped.Add(float64(len(payload.Messages)), "true", strconv.Itoa(i))
}
}
}

// Attempt to send to unreliable destinations
for _, destSender := range unreliableDestinations {
destSender.NonBlockingSend(payload)
for i, destSender := range unreliableDestinations {
if !destSender.NonBlockingSend(payload) {
tlmPayloadsDropped.Inc("false", strconv.Itoa(i))
tlmMessagesDropped.Add(float64(len(payload.Messages)), "false", strconv.Itoa(i))
}
}

inUse := float64(time.Since(startInUse) / time.Millisecond)
tlmSendWaitTime.Add(inUse)
}

// Cleanup the destinations
Expand Down

0 comments on commit 116fe13

Please sign in to comment.