diff --git a/pkg/logql/log/parser.go b/pkg/logql/log/parser.go index fbd43864d4d68..9dabba488ab02 100644 --- a/pkg/logql/log/parser.go +++ b/pkg/logql/log/parser.go @@ -315,14 +315,18 @@ func (j *JSONExpressionParser) Process(line []byte, lbs *LabelsBuilder) ([]byte, func (j *JSONExpressionParser) RequiredLabelNames() []string { return []string{} } -type UnpackParser struct{} +type UnpackParser struct { + lbsBuffer []string +} // NewUnpackParser creates a new unpack stage. // The unpack stage will parse a json log line as map[string]string where each key will be translated into labels. // A special key _entry will also be used to replace the original log line. This is to be used in conjunction with Promtail pack stage. // see https://grafana.com/docs/loki/latest/clients/promtail/stages/pack/ func NewUnpackParser() *UnpackParser { - return &UnpackParser{} + return &UnpackParser{ + lbsBuffer: make([]string, 0, 16), + } } func (UnpackParser) RequiredLabelNames() []string { return []string{} } @@ -331,10 +335,11 @@ func (u *UnpackParser) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) { if lbs.ParserLabelHints().NoLabels() { return line, true } + u.lbsBuffer = u.lbsBuffer[:0] it := jsoniter.ConfigFastest.BorrowIterator(line) defer jsoniter.ConfigFastest.ReturnIterator(it) - entry, err := u.unpack(it, lbs) + entry, err := u.unpack(it, line, lbs) if err != nil { lbs.SetErr(errJSON) return line, true @@ -342,12 +347,12 @@ func (u *UnpackParser) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) { return entry, true } -func (u *UnpackParser) unpack(it *jsoniter.Iterator, lbs *LabelsBuilder) ([]byte, error) { +func (u *UnpackParser) unpack(it *jsoniter.Iterator, entry []byte, lbs *LabelsBuilder) ([]byte, error) { // we only care about object and values. if nextType := it.WhatIsNext(); nextType != jsoniter.ObjectValue { return nil, fmt.Errorf("expecting json object(%d), got %d", jsoniter.ObjectValue, nextType) } - var entry []byte + var isPacked bool _ = it.ReadMapCB(func(iter *jsoniter.Iterator, field string) bool { switch iter.WhatIsNext() { case jsoniter.StringValue: @@ -357,8 +362,8 @@ func (u *UnpackParser) unpack(it *jsoniter.Iterator, lbs *LabelsBuilder) ([]byte // todo(ctovena): we should just reslice the original line since the property is contiguous // but jsoniter doesn't allow us to do this right now. // https://github.com/buger/jsonparser might do a better job at this. - entry = make([]byte, len(s)) - copy(entry, s) + entry = append(entry[:0], s...) + isPacked = true return true } if !lbs.ParserLabelHints().ShouldExtract(field) { @@ -368,7 +373,8 @@ func (u *UnpackParser) unpack(it *jsoniter.Iterator, lbs *LabelsBuilder) ([]byte if lbs.BaseHas(field) { field = field + duplicateSuffix } - lbs.Set(field, iter.ReadString()) + // append to the buffer of labels + u.lbsBuffer = append(u.lbsBuffer, field, iter.ReadString()) default: iter.Skip() } @@ -377,5 +383,11 @@ func (u *UnpackParser) unpack(it *jsoniter.Iterator, lbs *LabelsBuilder) ([]byte if it.Error != nil && it.Error != io.EOF { return nil, it.Error } + // flush the buffer if we found a packed entry. + if isPacked { + for i := 0; i < len(u.lbsBuffer); i = i + 2 { + lbs.Set(u.lbsBuffer[i], u.lbsBuffer[i+1]) + } + } return entry, nil } diff --git a/pkg/logql/log/parser_hints_test.go b/pkg/logql/log/parser_hints_test.go index 988aa3b51524d..b32d9285206bd 100644 --- a/pkg/logql/log/parser_hints_test.go +++ b/pkg/logql/log/parser_hints_test.go @@ -29,12 +29,18 @@ var ( } }`) + packedLine = []byte(`{ + "remote_user": "foo", + "upstream_addr": "10.0.0.1:80", + "protocol": "HTTP/2.0", + "cluster": "us-east-west", + "_entry":"foo" + }`) + logfmtLine = []byte(`ts=2021-02-02T14:35:05.983992774Z caller=spanlogger.go:79 org_id=3677 traceID=2e5c7234b8640997 Ingester.TotalReached=15 Ingester.TotalChunksMatched=0 Ingester.TotalBatches=0`) ) func Test_ParserHints(t *testing.T) { - lbs := labels.Labels{{Name: "app", Value: "nginx"}, {Name: "cluster", Value: "us-central-west"}} - t.Parallel() for _, tt := range []struct { expr string @@ -185,14 +191,14 @@ func Test_ParserHints(t *testing.T) { }, { `sum by (cluster_extracted)(count_over_time({app="nginx"} | unpack | cluster_extracted="us-east-west" [1m]))`, - jsonLine, + packedLine, true, 1.0, `{cluster_extracted="us-east-west"}`, }, { `sum(rate({app="nginx"} | unpack | nonexistant_field="foo" [1m]))`, - jsonLine, + packedLine, false, 0, ``, @@ -200,7 +206,7 @@ func Test_ParserHints(t *testing.T) { } { tt := tt t.Run(tt.expr, func(t *testing.T) { - t.Parallel() + lbs := labels.Labels{{Name: "app", Value: "nginx"}, {Name: "cluster", Value: "us-central-west"}} expr, err := logql.ParseSampleExpr(tt.expr) require.NoError(t, err) diff --git a/pkg/logql/log/parser_test.go b/pkg/logql/log/parser_test.go index 97a764d72a095..2b3dbcb782ba9 100644 --- a/pkg/logql/log/parser_test.go +++ b/pkg/logql/log/parser_test.go @@ -689,6 +689,19 @@ func Test_unpackParser_Parse(t *testing.T) { }, []byte(`some message`), }, + { + "should not change log and labels if no packed entry", + []byte(`{"bar":1,"app":"foo","namespace":"prod","pod":{"uid":"1"}}`), + labels.Labels{ + {Name: "app", Value: "bar"}, + {Name: "cluster", Value: "us-central1"}, + }, + labels.Labels{ + {Name: "app", Value: "bar"}, + {Name: "cluster", Value: "us-central1"}, + }, + []byte(`{"bar":1,"app":"foo","namespace":"prod","pod":{"uid":"1"}}`), + }, } for _, tt := range tests { j := NewUnpackParser()