Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: timestamp change during execution of json_v2 parser. #10657

Merged
merged 2 commits into from
Feb 15, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 22 additions & 21 deletions plugins/parsers/json_v2/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ type Parser struct {

// measurementName is the the name of the current config used in each line protocol
measurementName string
// timestamp is the timestamp used in each line protocol, defaults to time.Now()
timestamp time.Time

// **** Specific for object configuration ****
// subPathResults contains the results of sub-gjson path expressions provided in fields/tags table within object config
Expand Down Expand Up @@ -112,10 +110,12 @@ func (p *Parser) Parse(input []byte) ([]telegraf.Metric, error) {
}

// timestamp defaults to current time, or can be parsed from the JSON using a GJSON path expression
p.timestamp = time.Now()
timestamp := time.Now()
if c.TimestampPath != "" {
result := gjson.GetBytes(input, c.TimestampPath)

if result.Type == gjson.Null {
p.Log.Debugf("Message: %s", input)
return nil, fmt.Errorf(GJSONPathNUllErrorMSG)
}
if !result.IsArray() && !result.IsObject() {
Expand All @@ -125,24 +125,25 @@ func (p *Parser) Parse(input []byte) ([]telegraf.Metric, error) {
}

var err error
p.timestamp, err = internal.ParseTimestamp(c.TimestampFormat, result.String(), c.TimestampTimezone)
timestamp, err = internal.ParseTimestamp(c.TimestampFormat, result.String(), c.TimestampTimezone)

if err != nil {
return nil, err
}
}
}

fields, err := p.processMetric(input, c.Fields, false)
fields, err := p.processMetric(input, c.Fields, false, timestamp)
if err != nil {
return nil, err
}

tags, err := p.processMetric(input, c.Tags, true)
tags, err := p.processMetric(input, c.Tags, true, timestamp)
if err != nil {
return nil, err
}

objects, err := p.processObjects(input, c.JSONObjects)
objects, err := p.processObjects(input, c.JSONObjects, timestamp)
if err != nil {
return nil, err
}
Expand All @@ -168,7 +169,7 @@ func (p *Parser) Parse(input []byte) ([]telegraf.Metric, error) {
// processMetric will iterate over all 'field' or 'tag' configs and create metrics for each
// A field/tag can either be a single value or an array of values, each resulting in its own metric
// For multiple configs, a set of metrics is created from the cartesian product of each separate config
func (p *Parser) processMetric(input []byte, data []DataSet, tag bool) ([]telegraf.Metric, error) {
func (p *Parser) processMetric(input []byte, data []DataSet, tag bool, timestamp time.Time) ([]telegraf.Metric, error) {
if len(data) == 0 {
return nil, nil
}
Expand Down Expand Up @@ -207,13 +208,13 @@ func (p *Parser) processMetric(input []byte, data []DataSet, tag bool) ([]telegr
p.measurementName,
map[string]string{},
map[string]interface{}{},
p.timestamp,
timestamp,
),
Result: result,
}

// Expand all array's and nested arrays into separate metrics
nodes, err := p.expandArray(mNode)
nodes, err := p.expandArray(mNode, timestamp)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -259,15 +260,15 @@ func mergeMetric(a telegraf.Metric, m telegraf.Metric) {
}

// expandArray will recursively create a new MetricNode for each element in a JSON array or single value
func (p *Parser) expandArray(result MetricNode) ([]telegraf.Metric, error) {
func (p *Parser) expandArray(result MetricNode, timestamp time.Time) ([]telegraf.Metric, error) {
var results []telegraf.Metric

if result.IsObject() {
if !p.iterateObjects {
p.Log.Debugf("Found object in query ignoring it please use 'object' to gather metrics from objects")
return results, nil
}
r, err := p.combineObject(result)
r, err := p.combineObject(result, timestamp)
if err != nil {
return nil, err
}
Expand All @@ -285,14 +286,14 @@ func (p *Parser) expandArray(result MetricNode) ([]telegraf.Metric, error) {
p.measurementName,
map[string]string{},
map[string]interface{}{},
p.timestamp,
timestamp,
)
if val.IsObject() {
n := result
n.ParentIndex += val.Index
n.Metric = m
n.Result = val
r, err := p.combineObject(n)
r, err := p.combineObject(n, timestamp)
if err != nil {
return false
}
Expand All @@ -311,7 +312,7 @@ func (p *Parser) expandArray(result MetricNode) ([]telegraf.Metric, error) {
n.ParentIndex += val.Index
n.Metric = m
n.Result = val
r, err := p.expandArray(n)
r, err := p.expandArray(n, timestamp)
if err != nil {
return false
}
Expand Down Expand Up @@ -400,7 +401,7 @@ func (p *Parser) existsInpathResults(index int) *PathResult {
}

// processObjects will iterate over all 'object' configs and create metrics for each
func (p *Parser) processObjects(input []byte, objects []JSONObject) ([]telegraf.Metric, error) {
func (p *Parser) processObjects(input []byte, objects []JSONObject, timestamp time.Time) ([]telegraf.Metric, error) {
p.iterateObjects = true
var t []telegraf.Metric
for _, c := range objects {
Expand Down Expand Up @@ -449,11 +450,11 @@ func (p *Parser) processObjects(input []byte, objects []JSONObject) ([]telegraf.
p.measurementName,
map[string]string{},
map[string]interface{}{},
p.timestamp,
timestamp,
),
Result: result,
}
metrics, err := p.expandArray(rootObject)
metrics, err := p.expandArray(rootObject, timestamp)
if err != nil {
return nil, err
}
Expand All @@ -465,7 +466,7 @@ func (p *Parser) processObjects(input []byte, objects []JSONObject) ([]telegraf.

// combineObject will add all fields/tags to a single metric
// If the object has multiple array's as elements it won't comine those, they will remain separate metrics
func (p *Parser) combineObject(result MetricNode) ([]telegraf.Metric, error) {
func (p *Parser) combineObject(result MetricNode, timestamp time.Time) ([]telegraf.Metric, error) {
var results []telegraf.Metric
if result.IsArray() || result.IsObject() {
var err error
Expand Down Expand Up @@ -519,12 +520,12 @@ func (p *Parser) combineObject(result MetricNode) ([]telegraf.Metric, error) {
arrayNode.Tag = tag

if val.IsObject() {
results, err = p.combineObject(arrayNode)
results, err = p.combineObject(arrayNode, timestamp)
if err != nil {
return false
}
} else {
r, err := p.expandArray(arrayNode)
r, err := p.expandArray(arrayNode, timestamp)
if err != nil {
return false
}
Expand Down