Skip to content

Commit

Permalink
Elasticsearch: Fix processing of raw_data with not-recognized time fo…
Browse files Browse the repository at this point in the history
…rmat (#78262)

* Elasticsearch: Fix non-standard time field in raw data queries

* Update snapshot tests

* Refactor
  • Loading branch information
ivanahuckova authored Nov 20, 2023
1 parent cfc67a9 commit 28f4c3e
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 1 deletion.
5 changes: 5 additions & 0 deletions pkg/tsdb/elasticsearch/data_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,11 @@ func processDocumentQuery(q *Query, b *es.SearchRequestBuilder, from, to int64,
b.Sort(es.SortOrderDesc, defaultTimeField, "boolean")
b.Sort(es.SortOrderDesc, "_doc", "")
b.AddDocValueField(defaultTimeField)
if isRawDataQuery(q) {
// For raw_data queries we need to add timeField as field with standardized time format to not receive
// invalid formats that elasticsearch can parse, but our frontend can't (e.g. yyyy_MM_dd_HH_mm_ss)
b.AddTimeFieldWithStandardizedFormat(defaultTimeField)
}
b.Size(stringToIntWithDefaultValue(metric.Settings.Get("size").MustString(), defaultSize))
}

Expand Down
9 changes: 9 additions & 0 deletions pkg/tsdb/elasticsearch/response_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,15 @@ func processRawDataResponse(res *es.SearchResponse, target *Query, configuredFie
doc[k] = v
}

if hit["fields"] != nil {
source, ok := hit["fields"].(map[string]interface{})
if ok {
for k, v := range source {
doc[k] = v
}
}
}

for key := range doc {
propNames[key] = true
}
Expand Down
86 changes: 86 additions & 0 deletions pkg/tsdb/elasticsearch/response_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,92 @@ func TestProcessRawDataResponse(t *testing.T) {
require.Equal(t, filterableConfig, *field.Config)
}
})

t.Run("gets correct time field from fields", func(t *testing.T) {
query := []byte(`
[
{
"refId": "A",
"metrics": [{ "type": "raw_data", "id": "1" }]
}
]
`)

response := []byte(`
{
"responses": [
{
"aggregations": {},
"hits": {
"hits": [
{
"_id": "fdsfs",
"_type": "_doc",
"_index": "mock-index",
"_source": {
"testtime": "06/24/2019",
"host": "djisaodjsoad",
"number": 1,
"line": "hello, i am a message",
"level": "debug",
"fields": { "lvl": "debug" }
},
"highlight": {
"message": [
"@HIGHLIGHT@hello@/HIGHLIGHT@, i am a @HIGHLIGHT@message@/HIGHLIGHT@"
]
},
"fields": {
"testtime": [ "2019-06-24T09:51:19.765Z" ]
}
},
{
"_id": "kdospaidopa",
"_type": "_doc",
"_index": "mock-index",
"_source": {
"testtime": "06/24/2019",
"host": "dsalkdakdop",
"number": 2,
"line": "hello, i am also message",
"level": "error",
"fields": { "lvl": "info" }
},
"highlight": {
"message": [
"@HIGHLIGHT@hello@/HIGHLIGHT@, i am a @HIGHLIGHT@message@/HIGHLIGHT@"
]
},
"fields": {
"testtime": [ "2019-06-24T09:52:19.765Z" ]
}
}
]
}
}
]
}
`)
result, err := queryDataTest(query, response)
require.NoError(t, err)

require.Len(t, result.response.Responses, 1)
frames := result.response.Responses["A"].Frames
require.Len(t, frames, 1)

logsFrame := frames[0]

logsFieldMap := make(map[string]*data.Field)
for _, field := range logsFrame.Fields {
logsFieldMap[field.Name] = field
}
t0 := time.Date(2019, time.June, 24, 9, 51, 19, 765000000, time.UTC)
t1 := time.Date(2019, time.June, 24, 9, 52, 19, 765000000, time.UTC)
require.Contains(t, logsFieldMap, "testtime")
require.Equal(t, data.FieldTypeNullableTime, logsFieldMap["testtime"].Type())
require.Equal(t, &t0, logsFieldMap["testtime"].At(0))
require.Equal(t, &t1, logsFieldMap["testtime"].At(1))
})
}

func TestProcessRawDocumentResponse(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,12 @@
"_doc": {
"order": "desc"
}
}
},
"fields":
[
{
"field": "testtime",
"format": "strict_date_optional_time_nanos"
}
]
}

0 comments on commit 28f4c3e

Please sign in to comment.