From db6f7e42648f8be6667cbc21de388edb1bc30974 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Tue, 27 Sep 2022 12:18:21 -0700 Subject: [PATCH] [chore] remove unnecessary allocations in fluentforwardreceiver Signed-off-by: Bogdan Drutu --- receiver/fluentforwardreceiver/collector.go | 31 ++++++++------------- 1 file changed, 12 insertions(+), 19 deletions(-) diff --git a/receiver/fluentforwardreceiver/collector.go b/receiver/fluentforwardreceiver/collector.go index db9e17b3893f..43598f89fd49 100644 --- a/receiver/fluentforwardreceiver/collector.go +++ b/receiver/fluentforwardreceiver/collector.go @@ -53,35 +53,28 @@ func (c *Collector) processEvents(ctx context.Context) { case <-ctx.Done(): return case e := <-c.eventCh: - buffered := []Event{e} + out := plog.NewLogs() + rls := out.ResourceLogs().AppendEmpty() + logSlice := rls.ScopeLogs().AppendEmpty().LogRecords() + e.LogRecords().MoveAndAppendTo(logSlice) + // Pull out anything waiting on the eventCh to get better // efficiency on LogResource allocations. - buffered = fillBufferUntilChanEmpty(c.eventCh, buffered) + c.fillBufferUntilChanEmpty(logSlice) - logs := collectLogRecords(buffered) - _ = c.nextConsumer.ConsumeLogs(ctx, logs) + stats.Record(context.Background(), observ.RecordsGenerated.M(int64(out.LogRecordCount()))) + _ = c.nextConsumer.ConsumeLogs(ctx, out) } } } -func fillBufferUntilChanEmpty(eventCh <-chan Event, buf []Event) []Event { +func (c *Collector) fillBufferUntilChanEmpty(dest plog.LogRecordSlice) { for { select { - case e2 := <-eventCh: - buf = append(buf, e2) + case e := <-c.eventCh: + e.LogRecords().MoveAndAppendTo(dest) default: - return buf + return } } } - -func collectLogRecords(events []Event) plog.Logs { - out := plog.NewLogs() - rls := out.ResourceLogs().AppendEmpty() - logSlice := rls.ScopeLogs().AppendEmpty().LogRecords() - for i := range events { - events[i].LogRecords().MoveAndAppendTo(logSlice) - } - stats.Record(context.Background(), observ.RecordsGenerated.M(int64(out.LogRecordCount()))) - return out -}