From af5f7c8738fc6eaf3243285e128ebba5850e97d2 Mon Sep 17 00:00:00 2001
From: Trevor Whitney <trevorjwhitney@gmail.com>
Date: Fri, 6 Sep 2024 13:05:23 -0600
Subject: [PATCH] fix: correct _extracted logic in detected fields (#14064)

Co-authored-by: Sven Grossmann <svennergr@gmail.com>
(cherry picked from commit 1b3ba530b8fab9aac999387a135a76a62de3e000)
---
 pkg/querier/querier.go           | 135 +++++---
 pkg/querier/querier_mock_test.go |  64 +++-
 pkg/querier/querier_test.go      | 572 +++++++++++++++++++++++++++++++
 3 files changed, 704 insertions(+), 67 deletions(-)

diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go
index cb8086564752d..2046763e6973f 100644
--- a/pkg/querier/querier.go
+++ b/pkg/querier/querier.go
@@ -1102,9 +1102,8 @@ func (q *SingleTenantQuerier) DetectedFields(ctx context.Context, req *logproto.
 	if err != nil {
 		return nil, err
 	}
-	parsers := getParsersFromExpr(expr)
 
-	detectedFields := parseDetectedFields(req.FieldLimit, streams, parsers)
+	detectedFields := parseDetectedFields(req.FieldLimit, streams)
 
 	fields := make([]*logproto.DetectedField, len(detectedFields))
 	fieldCount := 0
@@ -1220,7 +1219,7 @@ func determineType(value string) logproto.DetectedFieldType {
 	return logproto.DetectedFieldString
 }
 
-func parseDetectedFields(limit uint32, streams logqlmodel.Streams, queryParsers []string) map[string]*parsedFields {
+func parseDetectedFields(limit uint32, streams logqlmodel.Streams) map[string]*parsedFields {
 	detectedFields := make(map[string]*parsedFields, limit)
 	fieldCount := uint32(0)
 	emtpyparsers := []string{}
@@ -1258,11 +1257,8 @@ func parseDetectedFields(limit uint32, streams logqlmodel.Streams, queryParsers
 				}
 			}
 
-			parsers := queryParsers
-			parsedLabels := getParsedLabels(entry)
-			if len(parsedLabels) == 0 {
-				parsedLabels, parsers = parseLine(entry.Line, streamLbls)
-			}
+			streamLbls := logql_log.NewBaseLabelsBuilder().ForLabels(streamLbls, streamLbls.Hash())
+			parsedLabels, parsers := parseEntry(entry, streamLbls)
 			for k, vals := range parsedLabels {
 				df, ok := detectedFields[k]
 				if !ok && fieldCount < limit {
@@ -1299,9 +1295,9 @@ func parseDetectedFields(limit uint32, streams logqlmodel.Streams, queryParsers
 	return detectedFields
 }
 
-func getParsedLabels(entry push.Entry) map[string][]string {
+func getStructuredMetadata(entry push.Entry) map[string][]string {
 	labels := map[string]map[string]struct{}{}
-	for _, lbl := range entry.Parsed {
+	for _, lbl := range entry.StructuredMetadata {
 		if values, ok := labels[lbl.Name]; ok {
 			values[lbl.Value] = struct{}{}
 		} else {
@@ -1321,50 +1317,50 @@ func getParsedLabels(entry push.Entry) map[string][]string {
 	return result
 }
 
-func getStructuredMetadata(entry push.Entry) map[string][]string {
-	labels := map[string]map[string]struct{}{}
-	for _, lbl := range entry.StructuredMetadata {
-		if values, ok := labels[lbl.Name]; ok {
-			values[lbl.Value] = struct{}{}
-		} else {
-			labels[lbl.Name] = map[string]struct{}{lbl.Value: {}}
-		}
-	}
+func parseEntry(entry push.Entry, lbls *logql_log.LabelsBuilder) (map[string][]string, []string) {
+	origParsed := getParsedLabels(entry)
+	parsed := make(map[string][]string, len(origParsed))
 
-	result := make(map[string][]string, len(labels))
-	for lbl, values := range labels {
-		vals := make([]string, 0, len(values))
-		for v := range values {
-			vals = append(vals, v)
+	for lbl, values := range origParsed {
+		if lbl == logqlmodel.ErrorLabel || lbl == logqlmodel.ErrorDetailsLabel ||
+			lbl == logqlmodel.PreserveErrorLabel {
+			continue
 		}
-		result[lbl] = vals
-	}
-
-	return result
-}
 
-func parseLine(line string, streamLbls labels.Labels) (map[string][]string, []string) {
-	parser := "logfmt"
-	logFmtParser := logql_log.NewLogfmtParser(true, false)
+		parsed[lbl] = values
+	}
 
-	lbls := logql_log.NewBaseLabelsBuilder().ForLabels(streamLbls, 0)
-	_, logfmtSuccess := logFmtParser.Process(0, []byte(line), lbls)
-	if !logfmtSuccess || lbls.HasErr() {
-		parser = "json"
-		jsonParser := logql_log.NewJSONParser()
+	line := entry.Line
+	parser := "json"
+	jsonParser := logql_log.NewJSONParser()
+	_, jsonSuccess := jsonParser.Process(0, []byte(line), lbls)
+	if !jsonSuccess || lbls.HasErr() {
 		lbls.Reset()
-		_, jsonSuccess := jsonParser.Process(0, []byte(line), lbls)
-		if !jsonSuccess || lbls.HasErr() {
-			return map[string][]string{}, nil
+
+		logFmtParser := logql_log.NewLogfmtParser(false, false)
+		parser = "logfmt"
+		_, logfmtSuccess := logFmtParser.Process(0, []byte(line), lbls)
+		if !logfmtSuccess || lbls.HasErr() {
+			return parsed, nil
 		}
 	}
 
 	parsedLabels := map[string]map[string]struct{}{}
-	for _, lbl := range lbls.LabelsResult().Labels() {
-		// skip indexed labels, as we only want detected fields
-		if streamLbls.Has(lbl.Name) {
-			continue
+	for lbl, values := range parsed {
+		if vals, ok := parsedLabels[lbl]; ok {
+			for _, value := range values {
+				vals[value] = struct{}{}
+			}
+		} else {
+			parsedLabels[lbl] = map[string]struct{}{}
+			for _, value := range values {
+				parsedLabels[lbl][value] = struct{}{}
+			}
 		}
+	}
+
+	lblsResult := lbls.LabelsResult().Parsed()
+	for _, lbl := range lblsResult {
 		if values, ok := parsedLabels[lbl.Name]; ok {
 			values[lbl.Value] = struct{}{}
 		} else {
@@ -1374,6 +1370,10 @@ func parseLine(line string, streamLbls labels.Labels) (map[string][]string, []st
 
 	result := make(map[string][]string, len(parsedLabels))
 	for lbl, values := range parsedLabels {
+		if lbl == logqlmodel.ErrorLabel || lbl == logqlmodel.ErrorDetailsLabel ||
+			lbl == logqlmodel.PreserveErrorLabel {
+			continue
+		}
 		vals := make([]string, 0, len(values))
 		for v := range values {
 			vals = append(vals, v)
@@ -1384,10 +1384,29 @@ func parseLine(line string, streamLbls labels.Labels) (map[string][]string, []st
 	return result, []string{parser}
 }
 
+func getParsedLabels(entry push.Entry) map[string][]string {
+	labels := map[string]map[string]struct{}{}
+	for _, lbl := range entry.Parsed {
+		if values, ok := labels[lbl.Name]; ok {
+			values[lbl.Value] = struct{}{}
+		} else {
+			labels[lbl.Name] = map[string]struct{}{lbl.Value: {}}
+		}
+	}
+
+	result := make(map[string][]string, len(labels))
+	for lbl, values := range labels {
+		vals := make([]string, 0, len(values))
+		for v := range values {
+			vals = append(vals, v)
+		}
+		result[lbl] = vals
+	}
+
+	return result
+}
+
 // streamsForFieldDetection reads the streams from the iterator and returns them sorted.
-// If categorizeLabels is true, the stream labels contains just the stream labels and entries inside each stream have their
-// structuredMetadata and parsed fields populated with structured metadata labels plus the parsed labels respectively.
-// Otherwise, the stream labels are the whole series labels including the stream labels, structured metadata labels and parsed labels.
 func streamsForFieldDetection(i iter.EntryIterator, size uint32) (logqlmodel.Streams, error) {
 	streams := map[string]*logproto.Stream{}
 	respSize := uint32(0)
@@ -1403,12 +1422,28 @@ func streamsForFieldDetection(i iter.EntryIterator, size uint32) (logqlmodel.Str
 		// If lastEntry.Unix < 0 this is the first pass through the loop and we should output the line.
 		// Then check to see if the entry is equal to, or past a forward step
 		if lastEntry.Unix() < 0 || shouldOutput {
-			stream, ok := streams[streamLabels]
+			allLbls, err := syntax.ParseLabels(streamLabels)
+			if err != nil {
+				continue
+			}
+
+			parsedLbls := logproto.FromLabelAdaptersToLabels(entry.Parsed)
+			structuredMetadata := logproto.FromLabelAdaptersToLabels(entry.StructuredMetadata)
+
+			onlyStreamLbls := logql_log.NewBaseLabelsBuilder().ForLabels(allLbls, 0)
+			allLbls.Range(func(l labels.Label) {
+				if parsedLbls.Has(l.Name) || structuredMetadata.Has(l.Name) {
+					onlyStreamLbls.Del(l.Name)
+				}
+			})
+
+			lblStr := onlyStreamLbls.LabelsResult().String()
+			stream, ok := streams[lblStr]
 			if !ok {
 				stream = &logproto.Stream{
-					Labels: streamLabels,
+					Labels: lblStr,
 				}
-				streams[streamLabels] = stream
+				streams[lblStr] = stream
 			}
 			stream.Entries = append(stream.Entries, entry)
 			lastEntry = i.At().Timestamp
diff --git a/pkg/querier/querier_mock_test.go b/pkg/querier/querier_mock_test.go
index 7f5cc24ff276c..6a5a45b5123da 100644
--- a/pkg/querier/querier_mock_test.go
+++ b/pkg/querier/querier_mock_test.go
@@ -8,6 +8,7 @@ import (
 	"time"
 
 	"github.com/grafana/loki/v3/pkg/logql/log"
+	"github.com/grafana/loki/v3/pkg/logql/syntax"
 
 	"github.com/grafana/loki/pkg/push"
 
@@ -16,6 +17,7 @@ import (
 	"github.com/grafana/dskit/grpcclient"
 	"github.com/grafana/dskit/ring"
 	ring_client "github.com/grafana/dskit/ring/client"
+	logql_log "github.com/grafana/loki/v3/pkg/logql/log"
 	"github.com/prometheus/client_golang/prometheus"
 	"github.com/prometheus/common/model"
 	"github.com/prometheus/prometheus/model/labels"
@@ -578,27 +580,41 @@ func mockLogfmtStream(from int, quantity int) logproto.Stream {
 	return mockLogfmtStreamWithLabels(from, quantity, `{type="test", name="foo"}`)
 }
 
-func mockLogfmtStreamWithLabels(_ int, quantity int, labels string) logproto.Stream {
+func mockLogfmtStreamWithLabels(_ int, quantity int, lbls string) logproto.Stream {
 	entries := make([]logproto.Entry, 0, quantity)
+	streamLabels, err := syntax.ParseLabels(lbls)
+	if err != nil {
+		streamLabels = labels.EmptyLabels()
+	}
+
+	lblBuilder := logql_log.NewBaseLabelsBuilder().ForLabels(streamLabels, streamLabels.Hash())
+	logFmtParser := logql_log.NewLogfmtParser(false, false)
 
 	// used for detected fields queries which are always BACKWARD
 	for i := quantity; i > 0; i-- {
-		entries = append(entries, logproto.Entry{
+		line := fmt.Sprintf(
+			`message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t name=bar`,
+			i,
+			i,
+			(i * 10),
+			(i * 256),
+			float32(i*10.0),
+			(i%2 == 0))
+
+		entry := logproto.Entry{
 			Timestamp: time.Unix(int64(i), 0),
-			Line: fmt.Sprintf(
-				`message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t name=bar`,
-				i,
-				i,
-				(i * 10),
-				(i * 256),
-				float32(i*10.0),
-				(i%2 == 0)),
-		})
+			Line:      line,
+		}
+		_, logfmtSuccess := logFmtParser.Process(0, []byte(line), lblBuilder)
+		if logfmtSuccess {
+			entry.Parsed = logproto.FromLabelsToLabelAdapters(lblBuilder.LabelsResult().Parsed())
+		}
+		entries = append(entries, entry)
 	}
 
 	return logproto.Stream{
 		Entries: entries,
-		Labels:  labels,
+		Labels:  lblBuilder.LabelsResult().String(),
 	}
 }
 
@@ -609,7 +625,7 @@ func mockLogfmtStreamWithStructuredMetadata(from int, quantity int) logproto.Str
 func mockLogfmtStreamWithLabelsAndStructuredMetadata(
 	from int,
 	quantity int,
-	labels string,
+	lbls string,
 ) logproto.Stream {
 	var entries []logproto.Entry
 	metadata := push.LabelsAdapter{
@@ -626,15 +642,29 @@ func mockLogfmtStreamWithLabelsAndStructuredMetadata(
 		})
 	}
 
+	streamLabels, err := syntax.ParseLabels(lbls)
+	if err != nil {
+		streamLabels = labels.EmptyLabels()
+	}
+
+	lblBuilder := logql_log.NewBaseLabelsBuilder().ForLabels(streamLabels, streamLabels.Hash())
+	logFmtParser := logql_log.NewLogfmtParser(false, false)
+
 	for i := quantity; i > 0; i-- {
-		entries = append(entries, logproto.Entry{
+		line := fmt.Sprintf(`message="line %d" count=%d fake=true`, i, i)
+		entry := logproto.Entry{
 			Timestamp:          time.Unix(int64(i), 0),
-			Line:               fmt.Sprintf(`message="line %d" count=%d fake=true`, i, i),
+			Line:               line,
 			StructuredMetadata: metadata,
-		})
+		}
+		_, logfmtSuccess := logFmtParser.Process(0, []byte(line), lblBuilder)
+		if logfmtSuccess {
+			entry.Parsed = logproto.FromLabelsToLabelAdapters(lblBuilder.LabelsResult().Parsed())
+		}
+		entries = append(entries, entry)
 	}
 	return logproto.Stream{
-		Labels:  labels,
+		Labels:  lbls,
 		Entries: entries,
 	}
 }
diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go
index e542b28247cd1..d7dbaa61e5648 100644
--- a/pkg/querier/querier_test.go
+++ b/pkg/querier/querier_test.go
@@ -16,17 +16,21 @@ import (
 	ring_client "github.com/grafana/dskit/ring/client"
 	"github.com/grafana/dskit/user"
 	"github.com/prometheus/common/model"
+	"github.com/prometheus/prometheus/promql/parser"
 	"github.com/stretchr/testify/assert"
 	"github.com/stretchr/testify/mock"
 	"github.com/stretchr/testify/require"
 
 	util_log "github.com/grafana/loki/v3/pkg/util/log"
 
+	"github.com/grafana/loki/pkg/push"
+
 	"github.com/grafana/loki/v3/pkg/compactor/deletion"
 	"github.com/grafana/loki/v3/pkg/ingester/client"
 	"github.com/grafana/loki/v3/pkg/logproto"
 	"github.com/grafana/loki/v3/pkg/logql"
 	"github.com/grafana/loki/v3/pkg/logql/syntax"
+	"github.com/grafana/loki/v3/pkg/logqlmodel"
 	"github.com/grafana/loki/v3/pkg/querier/plan"
 	"github.com/grafana/loki/v3/pkg/storage"
 	"github.com/grafana/loki/v3/pkg/util/constants"
@@ -2096,3 +2100,571 @@ func Test_getParsersFromExpr(t *testing.T) {
 		assert.Equal(t, []string{"logfmt", "json"}, getParsersFromExpr(expr))
 	})
 }
+
+func Test_parseDetectedFeilds(t *testing.T) {
+	now := time.Now()
+
+	t.Run("when no parsers are supplied", func(t *testing.T) {
+		infoDetectdFiledMetadata := []push.LabelAdapter{
+			{
+				Name:  "detected_level",
+				Value: "info",
+			},
+		}
+
+		rulerLines := []push.Entry{
+			{Timestamp: now, Line: "ts=2024-09-05T15:36:38.757788067Z caller=grpc_logging.go:66 tenant=2419 level=info method=/cortex.Ingester/Push duration=19.098s msg=gRPC", StructuredMetadata: infoDetectdFiledMetadata},
+			{Timestamp: now, Line: "ts=2024-09-05T15:36:38.698375619Z caller=grpc_logging.go:66 tenant=29 level=info method=/cortex.Ingester/Push duration=5.471s msg=gRPC", StructuredMetadata: infoDetectdFiledMetadata},
+			{Timestamp: now, Line: "ts=2024-09-05T15:36:38.629424175Z caller=grpc_logging.go:66 tenant=2919 level=info method=/cortex.Ingester/Push duration=29.234s msg=gRPC", StructuredMetadata: infoDetectdFiledMetadata},
+		}
+
+		rulerLbls := `{cluster="us-east-1", namespace="mimir-dev", pod="mimir-ruler-nfb37", service_name="mimir-ruler"}`
+		rulerMetric, err := parser.ParseMetric(rulerLbls)
+		require.NoError(t, err)
+
+		rulerStream := push.Stream{
+			Labels:  rulerLbls,
+			Entries: rulerLines,
+			Hash:    rulerMetric.Hash(),
+		}
+
+		debugDetectedFieldMetadata := []push.LabelAdapter{
+			{
+				Name:  "detected_level",
+				Value: "debug",
+			},
+		}
+
+		nginxJSONLines := []push.Entry{
+			{Timestamp: now, Line: `{"host":"100.117.38.203", "user-identifier":"nader3722", "datetime":"05/Sep/2024:16:13:56 +0000", "method": "PATCH", "request": "/api/loki/v1/push", "protocol":"HTTP/2.0", "status":200, "bytes":9664, "referer": "https://www.seniorbleeding-edge.net/exploit/robust/whiteboard"}`, StructuredMetadata: debugDetectedFieldMetadata},
+			{Timestamp: now, Line: `{"host":"66.134.9.30", "user-identifier":"-", "datetime":"05/Sep/2024:16:13:55 +0000", "method": "DELETE", "request": "/api/mimir/v1/push", "protocol":"HTTP/1.1", "status":200, "bytes":18688, "referer": "https://www.districtiterate.biz/synergistic/next-generation/extend"}`, StructuredMetadata: debugDetectedFieldMetadata},
+			{Timestamp: now, Line: `{"host":"66.134.9.30", "user-identifier":"-", "datetime":"05/Sep/2024:16:13:55 +0000", "method": "GET", "request": "/api/loki/v1/label/names", "protocol":"HTTP/1.1", "status":200, "bytes":9314, "referer": "https://www.dynamicimplement.info/enterprise/distributed/incentivize/strategic"}`, StructuredMetadata: debugDetectedFieldMetadata},
+		}
+
+		nginxLbls := `{ cluster="eu-west-1", level="debug", namespace="gateway", pod="nginx-json-oghco", service_name="nginx-json" }`
+		nginxMetric, err := parser.ParseMetric(nginxLbls)
+		require.NoError(t, err)
+
+		nginxStream := push.Stream{
+			Labels:  nginxLbls,
+			Entries: nginxJSONLines,
+			Hash:    nginxMetric.Hash(),
+		}
+
+		t.Run("detect logfmt fields when with no supplied parsers", func(t *testing.T) {
+			df := parseDetectedFields(uint32(15), logqlmodel.Streams([]push.Stream{rulerStream}))
+			for _, expected := range []string{"ts", "caller", "tenant", "level", "method", "duration", "msg"} {
+				require.Contains(t, df, expected)
+				parsers := df[expected].parsers
+
+				require.Len(t, parsers, 1)
+				require.Equal(t, "logfmt", parsers[0])
+			}
+
+			// no parsers for structed metadata
+			for _, expected := range []string{"detected_level"} {
+				require.Contains(t, df, expected)
+				parsers := df[expected].parsers
+
+				require.Len(t, parsers, 0)
+			}
+		})
+
+		t.Run("detect json fields when with no supplied parsers", func(t *testing.T) {
+			df := parseDetectedFields(uint32(15), logqlmodel.Streams([]push.Stream{nginxStream}))
+			for _, expected := range []string{"host", "user_identifier", "datetime", "method", "request", "protocol", "status", "bytes", "referer"} {
+				require.Contains(t, df, expected)
+				parsers := df[expected].parsers
+
+				require.Len(t, parsers, 1)
+				require.Equal(t, "json", parsers[0])
+			}
+
+			// no parsers for structed metadata
+			for _, expected := range []string{"detected_level"} {
+				require.Contains(t, df, expected)
+				parsers := df[expected].parsers
+
+				require.Len(t, parsers, 0)
+			}
+		})
+
+		t.Run("detect mixed fields when with no supplied parsers", func(t *testing.T) {
+			df := parseDetectedFields(uint32(20), logqlmodel.Streams([]push.Stream{rulerStream, nginxStream}))
+
+			for _, expected := range []string{"ts", "caller", "tenant", "level", "duration", "msg"} {
+				require.Contains(t, df, expected)
+				parsers := df[expected].parsers
+
+				require.Len(t, parsers, 1, "expected only logfmt parser for %s", expected)
+				require.Equal(t, "logfmt", parsers[0], "expected only logfmt parser for %s", expected)
+			}
+
+			for _, expected := range []string{"host", "user_identifier", "datetime", "request", "protocol", "status", "bytes", "referer"} {
+				require.Contains(t, df, expected)
+				parsers := df[expected].parsers
+
+				require.Len(t, parsers, 1, "expected only json parser for %s", expected)
+				require.Equal(t, "json", parsers[0], "expected only json parser for %s", expected)
+			}
+
+			// multiple parsers for fields that exist in both streams
+			for _, expected := range []string{"method"} {
+				require.Contains(t, df, expected)
+				parsers := df[expected].parsers
+
+				require.Len(t, parsers, 2, "expected logfmt and json parser for %s", expected)
+				require.Contains(t, parsers, "logfmt", "expected logfmt parser for %s", expected)
+				require.Contains(t, parsers, "json", "expected json parser for %s", expected)
+			}
+
+			// no parsers for structed metadata
+			for _, expected := range []string{"detected_level"} {
+				require.Contains(t, df, expected)
+				parsers := df[expected].parsers
+
+				require.Len(t, parsers, 0)
+			}
+		})
+
+		t.Run("correctly applies _extracted for a single stream", func(t *testing.T) {
+			rulerLbls := `{cluster="us-east-1", namespace="mimir-dev", pod="mimir-ruler-nfb37", service_name="mimir-ruler", tenant="42", caller="inside-the-house"}`
+			rulerMetric, err := parser.ParseMetric(rulerLbls)
+			require.NoError(t, err)
+
+			rulerStream := push.Stream{
+				Labels:  rulerLbls,
+				Entries: rulerLines,
+				Hash:    rulerMetric.Hash(),
+			}
+
+			df := parseDetectedFields(uint32(15), logqlmodel.Streams([]push.Stream{rulerStream}))
+			for _, expected := range []string{"ts", "caller_extracted", "tenant_extracted", "level", "method", "duration", "msg"} {
+				require.Contains(t, df, expected)
+				parsers := df[expected].parsers
+
+				require.Len(t, parsers, 1)
+				require.Equal(t, "logfmt", parsers[0])
+			}
+
+			// no parsers for structed metadata
+			for _, expected := range []string{"detected_level"} {
+				require.Contains(t, df, expected)
+				parsers := df[expected].parsers
+
+				require.Len(t, parsers, 0)
+			}
+		})
+
+		t.Run("correctly applies _extracted for multiple streams", func(t *testing.T) {
+			rulerLbls := `{cluster="us-east-1", namespace="mimir-dev", pod="mimir-ruler-nfb37", service_name="mimir-ruler", tenant="42", caller="inside-the-house"}`
+			rulerMetric, err := parser.ParseMetric(rulerLbls)
+			require.NoError(t, err)
+
+			rulerStream := push.Stream{
+				Labels:  rulerLbls,
+				Entries: rulerLines,
+				Hash:    rulerMetric.Hash(),
+			}
+
+			nginxLbls := `{ cluster="eu-west-1", level="debug", namespace="gateway", pod="nginx-json-oghco", service_name="nginx-json", host="localhost"}`
+			nginxMetric, err := parser.ParseMetric(nginxLbls)
+			require.NoError(t, err)
+
+			nginxStream := push.Stream{
+				Labels:  nginxLbls,
+				Entries: nginxJSONLines,
+				Hash:    nginxMetric.Hash(),
+			}
+
+			df := parseDetectedFields(uint32(20), logqlmodel.Streams([]push.Stream{rulerStream, nginxStream}))
+			for _, expected := range []string{"ts", "caller_extracted", "tenant_extracted", "level", "duration", "msg"} {
+				require.Contains(t, df, expected)
+				parsers := df[expected].parsers
+
+				require.Len(t, parsers, 1)
+				require.Equal(t, "logfmt", parsers[0])
+			}
+
+			for _, expected := range []string{"host_extracted", "user_identifier", "datetime", "request", "protocol", "status", "bytes", "referer"} {
+				require.Contains(t, df, expected)
+				parsers := df[expected].parsers
+
+				require.Len(t, parsers, 1, "expected only json parser for %s", expected)
+				require.Equal(t, "json", parsers[0], "expected only json parser for %s", expected)
+			}
+
+			// multiple parsers for fields that exist in both streams
+			for _, expected := range []string{"method"} {
+				require.Contains(t, df, expected)
+				parsers := df[expected].parsers
+
+				require.Len(t, parsers, 2, "expected logfmt and json parser for %s", expected)
+				require.Contains(t, parsers, "logfmt", "expected logfmt parser for %s", expected)
+				require.Contains(t, parsers, "json", "expected json parser for %s", expected)
+			}
+
+			// no parsers for structed metadata
+			for _, expected := range []string{"detected_level"} {
+				require.Contains(t, df, expected)
+				parsers := df[expected].parsers
+
+				require.Len(t, parsers, 0)
+			}
+		})
+	})
+
+	t.Run("when parsers are supplied", func(t *testing.T) {
+		infoDetectdFiledMetadata := []push.LabelAdapter{
+			{
+				Name:  "detected_level",
+				Value: "info",
+			},
+		}
+
+		parsedRulerFields := func(ts, tenant, duration string) []push.LabelAdapter {
+			return []push.LabelAdapter{
+				{
+					Name:  "ts",
+					Value: ts,
+				},
+				{
+					Name:  "caller",
+					Value: "grpc_logging.go:66",
+				},
+				{
+					Name:  "tenant",
+					Value: tenant,
+				},
+				{
+					Name:  "level",
+					Value: "info",
+				},
+				{
+					Name:  "method",
+					Value: "/cortex.Ingester/Push",
+				},
+				{
+					Name:  "duration",
+					Value: duration,
+				},
+				{
+					Name:  "msg",
+					Value: "gRPC",
+				},
+			}
+		}
+
+		rulerLines := []push.Entry{
+			{
+				Timestamp:          now,
+				Line:               "ts=2024-09-05T15:36:38.757788067Z caller=grpc_logging.go:66 tenant=2419 level=info method=/cortex.Ingester/Push duration=19.098s msg=gRPC",
+				StructuredMetadata: infoDetectdFiledMetadata,
+				Parsed:             parsedRulerFields("2024-09-05T15:36:38.757788067Z", "2419", "19.098s"),
+			},
+			{
+				Timestamp:          now,
+				Line:               "ts=2024-09-05T15:36:38.698375619Z caller=grpc_logging.go:66 tenant=29 level=info method=/cortex.Ingester/Push duration=5.471s msg=gRPC",
+				StructuredMetadata: infoDetectdFiledMetadata,
+				Parsed:             parsedRulerFields("2024-09-05T15:36:38.698375619Z", "29", "5.471s"),
+			},
+			{
+				Timestamp:          now,
+				Line:               "ts=2024-09-05T15:36:38.629424175Z caller=grpc_logging.go:66 tenant=2919 level=info method=/cortex.Ingester/Push duration=29.234s msg=gRPC",
+				StructuredMetadata: infoDetectdFiledMetadata,
+				Parsed:             parsedRulerFields("2024-09-05T15:36:38.629424175Z", "2919", "29.234s"),
+			},
+		}
+
+		rulerLbls := `{cluster="us-east-1", namespace="mimir-dev", pod="mimir-ruler-nfb37", service_name="mimir-ruler"}`
+		rulerMetric, err := parser.ParseMetric(rulerLbls)
+		require.NoError(t, err)
+
+		rulerStream := push.Stream{
+			Labels:  rulerLbls,
+			Entries: rulerLines,
+			Hash:    rulerMetric.Hash(),
+		}
+
+		debugDetectedFieldMetadata := []push.LabelAdapter{
+			{
+				Name:  "detected_level",
+				Value: "debug",
+			},
+		}
+
+		parsedNginxFields := func(host, userIdentifier, datetime, method, request, protocol, status, bytes, referer string) []push.LabelAdapter {
+			return []push.LabelAdapter{
+				{
+					Name:  "host",
+					Value: host,
+				},
+				{
+					Name:  "user_identifier",
+					Value: userIdentifier,
+				},
+				{
+					Name:  "datetime",
+					Value: datetime,
+				},
+				{
+					Name:  "method",
+					Value: method,
+				},
+				{
+					Name:  "request",
+					Value: request,
+				},
+				{
+					Name:  "protocol",
+					Value: protocol,
+				},
+				{
+					Name:  "status",
+					Value: status,
+				},
+				{
+					Name:  "bytes",
+					Value: bytes,
+				},
+				{
+					Name:  "referer",
+					Value: referer,
+				},
+			}
+		}
+
+		nginxJSONLines := []push.Entry{
+			{
+				Timestamp:          now,
+				Line:               `{"host":"100.117.38.203", "user-identifier":"nader3722", "datetime":"05/Sep/2024:16:13:56 +0000", "method": "PATCH", "request": "/api/loki/v1/push", "protocol":"HTTP/2.0", "status":200, "bytes":9664, "referer": "https://www.seniorbleeding-edge.net/exploit/robust/whiteboard"}`,
+				StructuredMetadata: debugDetectedFieldMetadata,
+				Parsed:             parsedNginxFields("100.117.38.203", "nadre3722", "05/Sep/2024:16:13:56 +0000", "PATCH", "/api/loki/v1/push", "HTTP/2.0", "200", "9664", "https://www.seniorbleeding-edge.net/exploit/robust/whiteboard"),
+			},
+			{
+				Timestamp:          now,
+				Line:               `{"host":"66.134.9.30", "user-identifier":"-", "datetime":"05/Sep/2024:16:13:55 +0000", "method": "DELETE", "request": "/api/mimir/v1/push", "protocol":"HTTP/1.1", "status":200, "bytes":18688, "referer": "https://www.districtiterate.biz/synergistic/next-generation/extend"}`,
+				StructuredMetadata: debugDetectedFieldMetadata,
+				Parsed:             parsedNginxFields("66.134.9.30", "-", "05/Sep/2024:16:13:55 +0000", "DELETE", "/api/mimir/v1/push", "HTTP/1.1", "200", "18688", "https://www.districtiterate.biz/synergistic/next-generation/extend"),
+			},
+			{
+				Timestamp:          now,
+				Line:               `{"host":"66.134.9.30", "user-identifier":"-", "datetime":"05/Sep/2024:16:13:55 +0000", "method": "GET", "request": "/api/loki/v1/label/names", "protocol":"HTTP/1.1", "status":200, "bytes":9314, "referer": "https://www.dynamicimplement.info/enterprise/distributed/incentivize/strategic"}`,
+				StructuredMetadata: debugDetectedFieldMetadata,
+				Parsed:             parsedNginxFields("66.134.9.30", "-", "05/Sep/2024:16:13:55 +0000", "GET", "/api/loki/v1/label/names", "HTTP/1.1", "200", "9314", "https://www.dynamicimplement.info/enterprise/distributed/incentivize/strategic"),
+			},
+		}
+
+		nginxLbls := `{ cluster="eu-west-1", level="debug", namespace="gateway", pod="nginx-json-oghco", service_name="nginx-json" }`
+		nginxMetric, err := parser.ParseMetric(nginxLbls)
+		require.NoError(t, err)
+
+		nginxStream := push.Stream{
+			Labels:  nginxLbls,
+			Entries: nginxJSONLines,
+			Hash:    nginxMetric.Hash(),
+		}
+
+		t.Run("detect logfmt fields", func(t *testing.T) {
+			df := parseDetectedFields(uint32(15), logqlmodel.Streams([]push.Stream{rulerStream}))
+			for _, expected := range []string{"ts", "caller", "tenant", "level", "method", "duration", "msg"} {
+				require.Contains(t, df, expected)
+				parsers := df[expected].parsers
+
+				require.Len(t, parsers, 1)
+				require.Equal(t, "logfmt", parsers[0])
+			}
+
+			// no parsers for structed metadata
+			for _, expected := range []string{"detected_level"} {
+				require.Contains(t, df, expected)
+				parsers := df[expected].parsers
+
+				require.Len(t, parsers, 0)
+			}
+		})
+
+		t.Run("detect json fields", func(t *testing.T) {
+			df := parseDetectedFields(uint32(15), logqlmodel.Streams([]push.Stream{nginxStream}))
+			for _, expected := range []string{"host", "user_identifier", "datetime", "method", "request", "protocol", "status", "bytes", "referer"} {
+				require.Contains(t, df, expected)
+				parsers := df[expected].parsers
+
+				require.Len(t, parsers, 1)
+				require.Equal(t, "json", parsers[0])
+			}
+
+			// no parsers for structed metadata
+			for _, expected := range []string{"detected_level"} {
+				require.Contains(t, df, expected)
+				parsers := df[expected].parsers
+
+				require.Len(t, parsers, 0)
+			}
+		})
+
+		t.Run("detect mixed fields", func(t *testing.T) {
+			df := parseDetectedFields(uint32(20), logqlmodel.Streams([]push.Stream{rulerStream, nginxStream}))
+
+			for _, expected := range []string{"ts", "caller", "tenant", "level", "duration", "msg"} {
+				require.Contains(t, df, expected)
+				parsers := df[expected].parsers
+
+				require.Len(t, parsers, 1, "expected only logfmt parser for %s", expected)
+				require.Equal(t, "logfmt", parsers[0], "expected only logfmt parser for %s", expected)
+			}
+
+			for _, expected := range []string{"host", "user_identifier", "datetime", "request", "protocol", "status", "bytes", "referer"} {
+				require.Contains(t, df, expected)
+				parsers := df[expected].parsers
+
+				require.Len(t, parsers, 1, "expected only json parser for %s", expected)
+				require.Equal(t, "json", parsers[0], "expected only json parser for %s", expected)
+			}
+
+			// multiple parsers for fields that exist in both streams
+			for _, expected := range []string{"method"} {
+				require.Contains(t, df, expected)
+				parsers := df[expected].parsers
+
+				require.Len(t, parsers, 2, "expected logfmt and json parser for %s", expected)
+				require.Contains(t, parsers, "logfmt", "expected logfmt parser for %s", expected)
+				require.Contains(t, parsers, "json", "expected json parser for %s", expected)
+			}
+
+			// no parsers for structed metadata
+			for _, expected := range []string{"detected_level"} {
+				require.Contains(t, df, expected)
+				parsers := df[expected].parsers
+
+				require.Len(t, parsers, 0)
+			}
+		})
+
+		t.Run("correctly applies _extracted for a single stream", func(t *testing.T) {
+			rulerLbls := `{cluster="us-east-1", namespace="mimir-dev", pod="mimir-ruler-nfb37", service_name="mimir-ruler", tenant="42", caller="inside-the-house"}`
+			rulerMetric, err := parser.ParseMetric(rulerLbls)
+			require.NoError(t, err)
+
+			rulerStream := push.Stream{
+				Labels:  rulerLbls,
+				Entries: rulerLines,
+				Hash:    rulerMetric.Hash(),
+			}
+
+			df := parseDetectedFields(uint32(15), logqlmodel.Streams([]push.Stream{rulerStream}))
+			for _, expected := range []string{"ts", "caller_extracted", "tenant_extracted", "level", "method", "duration", "msg"} {
+				require.Contains(t, df, expected)
+				parsers := df[expected].parsers
+
+				require.Len(t, parsers, 1)
+				require.Equal(t, "logfmt", parsers[0])
+			}
+
+			// no parsers for structed metadata
+			for _, expected := range []string{"detected_level"} {
+				require.Contains(t, df, expected)
+				parsers := df[expected].parsers
+
+				require.Len(t, parsers, 0)
+			}
+		})
+
+		t.Run("correctly applies _extracted for multiple streams", func(t *testing.T) {
+			rulerLbls := `{cluster="us-east-1", namespace="mimir-dev", pod="mimir-ruler-nfb37", service_name="mimir-ruler", tenant="42", caller="inside-the-house"}`
+			rulerMetric, err := parser.ParseMetric(rulerLbls)
+			require.NoError(t, err)
+
+			rulerStream := push.Stream{
+				Labels:  rulerLbls,
+				Entries: rulerLines,
+				Hash:    rulerMetric.Hash(),
+			}
+
+			nginxLbls := `{ cluster="eu-west-1", level="debug", namespace="gateway", pod="nginx-json-oghco", service_name="nginx-json", host="localhost"}`
+			nginxMetric, err := parser.ParseMetric(nginxLbls)
+			require.NoError(t, err)
+
+			nginxStream := push.Stream{
+				Labels:  nginxLbls,
+				Entries: nginxJSONLines,
+				Hash:    nginxMetric.Hash(),
+			}
+
+			df := parseDetectedFields(uint32(20), logqlmodel.Streams([]push.Stream{rulerStream, nginxStream}))
+			for _, expected := range []string{"ts", "caller_extracted", "tenant_extracted", "level", "duration", "msg"} {
+				require.Contains(t, df, expected)
+				parsers := df[expected].parsers
+
+				require.Len(t, parsers, 1)
+				require.Equal(t, "logfmt", parsers[0])
+			}
+
+			for _, expected := range []string{"host_extracted", "user_identifier", "datetime", "request", "protocol", "status", "bytes", "referer"} {
+				require.Contains(t, df, expected)
+				parsers := df[expected].parsers
+
+				require.Len(t, parsers, 1, "expected only json parser for %s", expected)
+				require.Equal(t, "json", parsers[0], "expected only json parser for %s", expected)
+			}
+
+			// multiple parsers for fields that exist in both streams
+			for _, expected := range []string{"method"} {
+				require.Contains(t, df, expected)
+				parsers := df[expected].parsers
+
+				require.Len(t, parsers, 2, "expected logfmt and json parser for %s", expected)
+				require.Contains(t, parsers, "logfmt", "expected logfmt parser for %s", expected)
+				require.Contains(t, parsers, "json", "expected json parser for %s", expected)
+			}
+
+			// no parsers for structed metadata
+			for _, expected := range []string{"detected_level"} {
+				require.Contains(t, df, expected)
+				parsers := df[expected].parsers
+
+				require.Len(t, parsers, 0)
+			}
+		})
+	})
+
+	t.Run("handles level in all the places", func(t *testing.T) {
+		rulerLbls := `{cluster="us-east-1", namespace="mimir-dev", pod="mimir-ruler-nfb37", service_name="mimir-ruler", tenant="42", caller="inside-the-house", level="debug"}`
+		rulerMetric, err := parser.ParseMetric(rulerLbls)
+		require.NoError(t, err)
+
+		rulerStream := push.Stream{
+			Labels: rulerLbls,
+			Entries: []push.Entry{
+				{
+					Timestamp: now,
+					Line:      "ts=2024-09-05T15:36:38.757788067Z caller=grpc_logging.go:66 tenant=2419 level=info method=/cortex.Ingester/Push duration=19.098s msg=gRPC",
+					StructuredMetadata: []push.LabelAdapter{
+						{
+							Name:  "detected_level",
+							Value: "debug",
+						},
+					},
+					Parsed: []push.LabelAdapter{
+						{
+							Name:  "level",
+							Value: "info",
+						},
+					},
+				},
+			},
+			Hash: rulerMetric.Hash(),
+		}
+
+		df := parseDetectedFields(uint32(20), logqlmodel.Streams([]push.Stream{rulerStream, rulerStream}))
+
+		detectedLevelField := df["detected_level"]
+		require.Len(t, detectedLevelField.parsers, 0)
+		require.Equal(t, uint64(1), detectedLevelField.sketch.Estimate())
+
+		levelField := df["level_extracted"]
+		require.Len(t, levelField.parsers, 1)
+		require.Contains(t, levelField.parsers, "logfmt")
+		require.Equal(t, uint64(1), levelField.sketch.Estimate())
+	})
+}