Skip to content

Commit

Permalink
[exporter/splunkhec] Add traces batching (#8995)
Browse files Browse the repository at this point in the history
* [Splunk HEC exporter] Add traces batching

* Update exporter/splunkhecexporter/client.go

Co-authored-by: Dmitrii Anoshin <[email protected]>

Co-authored-by: Dmitrii Anoshin <[email protected]>
  • Loading branch information
atoulme and dmitryax authored Apr 6, 2022
1 parent 0ed3674 commit 698254a
Show file tree
Hide file tree
Showing 9 changed files with 795 additions and 128 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## Unreleased

### 💡 Enhancements 💡
- `splunkhecexporter`: Add support for batching traces (#8995)

- `tanzuobservabilityexporter`: Use resourcetotelemetry helper (#8338)
- `cmd/mdatagen`: Add resource attributes definition to metadata.yaml and move `pdata.Metrics` creation to the
Expand Down
193 changes: 189 additions & 4 deletions exporter/splunkhecexporter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,43 @@ func (c *client) pushTraceData(
c.wg.Add(1)
defer c.wg.Done()

splunkEvents, _ := traceDataToSplunk(c.logger, td, c.config)
if len(splunkEvents) == 0 {
return nil
gzipWriter := c.zippers.Get().(*gzip.Writer)
defer c.zippers.Put(gzipWriter)

gzipBuffer := bytes.NewBuffer(make([]byte, 0, c.config.MaxContentLengthLogs))
gzipWriter.Reset(gzipBuffer)

// Callback when each batch is to be sent.
send := func(ctx context.Context, buf *bytes.Buffer) (err error) {
localHeaders := map[string]string{}
if td.ResourceSpans().Len() != 0 {
accessToken, found := td.ResourceSpans().At(0).Resource().Attributes().Get(splunk.HecTokenLabel)
if found {
localHeaders["Authorization"] = splunk.HECTokenHeader + " " + accessToken.StringVal()
}
}

shouldCompress := buf.Len() >= minCompressionLen && !c.config.DisableCompression

if shouldCompress {
gzipBuffer.Reset()
gzipWriter.Reset(gzipBuffer)

if _, err = io.Copy(gzipWriter, buf); err != nil {
return fmt.Errorf("failed copying buffer to gzip writer: %v", err)
}

if err = gzipWriter.Close(); err != nil {
return fmt.Errorf("failed flushing compressed data to gzip writer: %v", err)
}

return c.postEvents(ctx, gzipBuffer, localHeaders, shouldCompress)
}

return c.postEvents(ctx, buf, localHeaders, shouldCompress)
}

return c.sendSplunkEvents(ctx, splunkEvents)
return c.pushTracesDataInBatches(ctx, td, send)
}

func (c *client) sendSplunkEvents(ctx context.Context, splunkEvents []*splunk.Event) error {
Expand Down Expand Up @@ -399,6 +430,71 @@ func (c *client) pushMetricsRecords(ctx context.Context, mds pdata.ResourceMetri
return permanentErrors, nil
}

func (c *client) pushTracesData(ctx context.Context, tds pdata.ResourceSpansSlice, state *bufferState, send func(context.Context, *bytes.Buffer) error) (permanentErrors []error, sendingError error) {
res := tds.At(state.resource)
spans := res.ScopeSpans().At(state.library).Spans()
bufCap := int(c.config.MaxContentLengthTraces)

for k := 0; k < spans.Len(); k++ {
if state.bufFront == nil {
state.bufFront = &index{resource: state.resource, library: state.library, record: k}
}

// Parsing span record to Splunk event.
event := mapSpanToSplunkEvent(res.Resource(), spans.At(k), c.config, c.logger)
// JSON encoding event and writing to buffer.
b, err := jsoniter.Marshal(event)
if err != nil {
permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped span events: %v, error: %v", event, err)))
continue
}
state.buf.Write(b)

// Continue adding events to buffer up to capacity.
// 0 capacity is interpreted as unknown/unbound consistent with ContentLength in http.Request.
if state.buf.Len() <= bufCap || bufCap == 0 {
// Tracking length of event bytes below capacity in buffer.
state.bufLen = state.buf.Len()
continue
}

state.tmpBuf.Reset()
// Storing event bytes over capacity in buffer before truncating.
if bufCap > 0 {
if over := state.buf.Len() - state.bufLen; over <= bufCap {
state.tmpBuf.Write(state.buf.Bytes()[state.bufLen:state.buf.Len()])
} else {
permanentErrors = append(permanentErrors, consumererror.NewPermanent(
fmt.Errorf("dropped span event: %s, error: event size %d bytes larger than configured max content length %d bytes", string(state.buf.Bytes()[state.bufLen:state.buf.Len()]), over, bufCap)))
}
}

// Truncating buffer at tracked length below capacity and sending.
state.buf.Truncate(state.bufLen)
if state.buf.Len() > 0 {
if err := send(ctx, state.buf); err != nil {
return permanentErrors, err
}
}
state.buf.Reset()

// Writing truncated bytes back to buffer.
state.tmpBuf.WriteTo(state.buf)

if state.buf.Len() > 0 {
// This means that the current record had overflown the buffer and was not sent
state.bufFront = &index{resource: state.resource, library: state.library, record: k}
} else {
// This means that the entire buffer was sent, including the current record
state.bufFront = nil
}

state.bufLen = state.buf.Len()
}

return permanentErrors, nil
}

// pushMetricsDataInBatches sends batches of Splunk events in JSON format.
// The batch content length is restricted to MaxContentLengthMetrics.
// md metrics are parsed to Splunk events.
Expand Down Expand Up @@ -434,6 +530,41 @@ func (c *client) pushMetricsDataInBatches(ctx context.Context, md pdata.Metrics,
return multierr.Combine(permanentErrors...)
}

// pushTracesDataInBatches sends batches of Splunk events in JSON format.
// The batch content length is restricted to MaxContentLengthMetrics.
// td traces are parsed to Splunk events.
func (c *client) pushTracesDataInBatches(ctx context.Context, td pdata.Traces, send func(context.Context, *bytes.Buffer) error) error {
var bufState = makeBlankBufferState(c.config.MaxContentLengthTraces)
var permanentErrors []error

var rts = td.ResourceSpans()
for i := 0; i < rts.Len(); i++ {
ilts := rts.At(i).ScopeSpans()
for j := 0; j < ilts.Len(); j++ {
var err error
var newPermanentErrors []error

bufState.resource, bufState.library = i, j
newPermanentErrors, err = c.pushTracesData(ctx, rts, &bufState, send)

if err != nil {
return consumererror.NewTraces(err, *subTraces(&td, bufState.bufFront))
}

permanentErrors = append(permanentErrors, newPermanentErrors...)
}
}

// There's some leftover unsent traces
if bufState.buf.Len() > 0 {
if err := send(ctx, bufState.buf); err != nil {
return consumererror.NewTraces(err, *subTraces(&td, bufState.bufFront))
}
}

return multierr.Combine(permanentErrors...)
}

func (c *client) postEvents(ctx context.Context, events io.Reader, headers map[string]string, compressed bool) error {
req, err := http.NewRequestWithContext(ctx, "POST", c.url.String(), events)
if err != nil {
Expand Down Expand Up @@ -493,6 +624,18 @@ func subMetrics(md *pdata.Metrics, bufFront *index) *pdata.Metrics {
return &subset
}

// subTraces returns a subset of `td`starting from `bufFront`. It can be nil, in which case it is ignored
func subTraces(td *pdata.Traces, bufFront *index) *pdata.Traces {
if td == nil {
return td
}

subset := pdata.NewTraces()
subTracesByType(td, bufFront, &subset)

return &subset
}

func subLogsByType(src *pdata.Logs, from *index, dst *pdata.Logs, profiling bool) {
if from == nil {
return // All the data of this type was sent successfully
Expand Down Expand Up @@ -582,6 +725,48 @@ func subMetricsByType(src *pdata.Metrics, from *index, dst *pdata.Metrics) {
}
}

func subTracesByType(src *pdata.Traces, from *index, dst *pdata.Traces) {
if from == nil {
return // All the data of this type was sent successfully
}

resources := src.ResourceSpans()
resourcesSub := dst.ResourceSpans()

for i := from.resource; i < resources.Len(); i++ {
newSub := resourcesSub.AppendEmpty()
resources.At(i).Resource().CopyTo(newSub.Resource())

libraries := resources.At(i).ScopeSpans()
librariesSub := newSub.ScopeSpans()

j := 0
if i == from.resource {
j = from.library
}
for jSub := 0; j < libraries.Len(); j++ {
lib := libraries.At(j)

newLibSub := librariesSub.AppendEmpty()
lib.Scope().CopyTo(newLibSub.Scope())

traces := lib.Spans()
tracesSub := newLibSub.Spans()
jSub++

k := 0
if i == from.resource && j == from.library {
k = from.record
}

for kSub := 0; k < traces.Len(); k++ { //revive:disable-line:var-naming
traces.At(k).CopyTo(tracesSub.AppendEmpty())
kSub++
}
}
}
}

func encodeBodyEvents(zippers *sync.Pool, evs []*splunk.Event, disableCompression bool) (bodyReader io.Reader, compressed bool, err error) {
buf := new(bytes.Buffer)
for _, e := range evs {
Expand Down
Loading

0 comments on commit 698254a

Please sign in to comment.