Skip to content

Commit

Permalink
Only unpack entry if the key _entry exist. (#3426)
Browse files Browse the repository at this point in the history
* Only unpack entry if the key `_entry` exist.

Signed-off-by: Cyril Tovena <[email protected]>

* s/kv/lbs/g

Signed-off-by: Cyril Tovena <[email protected]>

* fix the test.

Signed-off-by: Cyril Tovena <[email protected]>
  • Loading branch information
cyriltovena authored Mar 3, 2021
1 parent 8c98ee2 commit 32d668c
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 13 deletions.
28 changes: 20 additions & 8 deletions pkg/logql/log/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,14 +315,18 @@ func (j *JSONExpressionParser) Process(line []byte, lbs *LabelsBuilder) ([]byte,

func (j *JSONExpressionParser) RequiredLabelNames() []string { return []string{} }

type UnpackParser struct{}
type UnpackParser struct {
lbsBuffer []string
}

// NewUnpackParser creates a new unpack stage.
// The unpack stage will parse a json log line as map[string]string where each key will be translated into labels.
// A special key _entry will also be used to replace the original log line. This is to be used in conjunction with Promtail pack stage.
// see https://grafana.com/docs/loki/latest/clients/promtail/stages/pack/
func NewUnpackParser() *UnpackParser {
return &UnpackParser{}
return &UnpackParser{
lbsBuffer: make([]string, 0, 16),
}
}

func (UnpackParser) RequiredLabelNames() []string { return []string{} }
Expand All @@ -331,23 +335,24 @@ func (u *UnpackParser) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) {
if lbs.ParserLabelHints().NoLabels() {
return line, true
}
u.lbsBuffer = u.lbsBuffer[:0]
it := jsoniter.ConfigFastest.BorrowIterator(line)
defer jsoniter.ConfigFastest.ReturnIterator(it)

entry, err := u.unpack(it, lbs)
entry, err := u.unpack(it, line, lbs)
if err != nil {
lbs.SetErr(errJSON)
return line, true
}
return entry, true
}

func (u *UnpackParser) unpack(it *jsoniter.Iterator, lbs *LabelsBuilder) ([]byte, error) {
func (u *UnpackParser) unpack(it *jsoniter.Iterator, entry []byte, lbs *LabelsBuilder) ([]byte, error) {
// we only care about object and values.
if nextType := it.WhatIsNext(); nextType != jsoniter.ObjectValue {
return nil, fmt.Errorf("expecting json object(%d), got %d", jsoniter.ObjectValue, nextType)
}
var entry []byte
var isPacked bool
_ = it.ReadMapCB(func(iter *jsoniter.Iterator, field string) bool {
switch iter.WhatIsNext() {
case jsoniter.StringValue:
Expand All @@ -357,8 +362,8 @@ func (u *UnpackParser) unpack(it *jsoniter.Iterator, lbs *LabelsBuilder) ([]byte
// todo(ctovena): we should just reslice the original line since the property is contiguous
// but jsoniter doesn't allow us to do this right now.
// https://github.com/buger/jsonparser might do a better job at this.
entry = make([]byte, len(s))
copy(entry, s)
entry = append(entry[:0], s...)
isPacked = true
return true
}
if !lbs.ParserLabelHints().ShouldExtract(field) {
Expand All @@ -368,7 +373,8 @@ func (u *UnpackParser) unpack(it *jsoniter.Iterator, lbs *LabelsBuilder) ([]byte
if lbs.BaseHas(field) {
field = field + duplicateSuffix
}
lbs.Set(field, iter.ReadString())
// append to the buffer of labels
u.lbsBuffer = append(u.lbsBuffer, field, iter.ReadString())
default:
iter.Skip()
}
Expand All @@ -377,5 +383,11 @@ func (u *UnpackParser) unpack(it *jsoniter.Iterator, lbs *LabelsBuilder) ([]byte
if it.Error != nil && it.Error != io.EOF {
return nil, it.Error
}
// flush the buffer if we found a packed entry.
if isPacked {
for i := 0; i < len(u.lbsBuffer); i = i + 2 {
lbs.Set(u.lbsBuffer[i], u.lbsBuffer[i+1])
}
}
return entry, nil
}
16 changes: 11 additions & 5 deletions pkg/logql/log/parser_hints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,18 @@ var (
}
}`)

packedLine = []byte(`{
"remote_user": "foo",
"upstream_addr": "10.0.0.1:80",
"protocol": "HTTP/2.0",
"cluster": "us-east-west",
"_entry":"foo"
}`)

logfmtLine = []byte(`ts=2021-02-02T14:35:05.983992774Z caller=spanlogger.go:79 org_id=3677 traceID=2e5c7234b8640997 Ingester.TotalReached=15 Ingester.TotalChunksMatched=0 Ingester.TotalBatches=0`)
)

func Test_ParserHints(t *testing.T) {
lbs := labels.Labels{{Name: "app", Value: "nginx"}, {Name: "cluster", Value: "us-central-west"}}

t.Parallel()
for _, tt := range []struct {
expr string
Expand Down Expand Up @@ -185,22 +191,22 @@ func Test_ParserHints(t *testing.T) {
},
{
`sum by (cluster_extracted)(count_over_time({app="nginx"} | unpack | cluster_extracted="us-east-west" [1m]))`,
jsonLine,
packedLine,
true,
1.0,
`{cluster_extracted="us-east-west"}`,
},
{
`sum(rate({app="nginx"} | unpack | nonexistant_field="foo" [1m]))`,
jsonLine,
packedLine,
false,
0,
``,
},
} {
tt := tt
t.Run(tt.expr, func(t *testing.T) {
t.Parallel()
lbs := labels.Labels{{Name: "app", Value: "nginx"}, {Name: "cluster", Value: "us-central-west"}}
expr, err := logql.ParseSampleExpr(tt.expr)
require.NoError(t, err)

Expand Down
13 changes: 13 additions & 0 deletions pkg/logql/log/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,19 @@ func Test_unpackParser_Parse(t *testing.T) {
},
[]byte(`some message`),
},
{
"should not change log and labels if no packed entry",
[]byte(`{"bar":1,"app":"foo","namespace":"prod","pod":{"uid":"1"}}`),
labels.Labels{
{Name: "app", Value: "bar"},
{Name: "cluster", Value: "us-central1"},
},
labels.Labels{
{Name: "app", Value: "bar"},
{Name: "cluster", Value: "us-central1"},
},
[]byte(`{"bar":1,"app":"foo","namespace":"prod","pod":{"uid":"1"}}`),
},
}
for _, tt := range tests {
j := NewUnpackParser()
Expand Down

0 comments on commit 32d668c

Please sign in to comment.