Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

json_v2 wrong timestamp on records #10651

Closed
kgrimsby opened this issue Feb 14, 2022 · 7 comments · Fixed by #10657
Closed

json_v2 wrong timestamp on records #10651

kgrimsby opened this issue Feb 14, 2022 · 7 comments · Fixed by #10657
Labels
area/json json and json_v2 parser/serialiser related area/kafka bug unexpected problem or unintended behavior

Comments

@kgrimsby
Copy link
Contributor

kgrimsby commented Feb 14, 2022

Relevant telegraf.conf

[agent]
collection_jitter = "0s"
debug = true
flush_interval = "10s"
flush_jitter = "0s"
hostname = "$HOSTNAME"
interval = "20s"
logfile = ""
metric_batch_size = 1000
metric_buffer_limit = 100000
omit_hostname = false
precision = "1ms"
quiet = false
round_interval = true
[[processors.enum]]
  [[processors.enum.mapping]]
     dest = "status_code"
     field = "status"
     [processors.enum.mapping.value_mappings]
            critical = 3
            healthy = 1
            problem = 2

[[outputs.file]]
        files = ["temp.carbon"]
        data_format="carbon2"

[[inputs.kafka_consumer]]
      brokers = [
        "xxx"
      ]
      client_id = "local-telegraf-test2"
      consumer_group = "local-telegraf-test2"
      data_format = "json_v2"
      enable_tls = true
      insecure_skip_verify = true
      max_message_len = 1000000
      sasl_mechanism = "PLAIN"
      sasl_password = ""
      sasl_username = ""
      topics = [
        "test"
      ]
      topic_tag = "topic"
      max_processing_time = "1000ms"
      offset = "oldest"
      [[inputs.kafka_consumer.json_v2]]
        measurement_name = "weight"
        timestamp_format = "unix_ms"
        timestamp_path = "weight_createdAt"
        [[inputs.kafka_consumer.json_v2.field]]
          path = "weight_weight"
          rename = "weight"
          type = "float"
        [[inputs.kafka_consumer.json_v2.tag]]
          path = "weight_serialNumber"
          rename = "serial_number"
        [[inputs.kafka_consumer.json_v2.tag]]
          path = "weight_imei"
          rename = "imei"
        [[inputs.kafka_consumer.json_v2.tag]]
          path = "sensor_customer_name"
          rename = "customer_name"
        [[inputs.kafka_consumer.json_v2.tag]]
          path = "sensor_distributor_name"
          rename = "distributor_name"
        [[inputs.kafka_consumer.json_v2.tag]]
          path = "sensor_dist_name"

Logs from Telegraf

No relevant logs

System info

Telegraf 1.21.3 tested on MacOs and Kubernetes(Docker image)

Docker

No response

Steps to reproduce

  1. Add data to kafka topic (seems large volume creates more "wrong" data)

...

This is tested with data from kafka to both InfluxDB Cloud and Carbon output file. Both outputs have the same error, but different results.

Expected behavior

Expect carbon output to mirror kafka topic

Actual behavior

$ cat temp.carbon| grep "164482" | grep "0004G"
metric=weight field=weight customer_name=Shakey's distributor_name=xx host=$HOSTNAME imei=356611075967160 sensor_dist_name=null serial_number=0004G topic=telegraf-test2  64.4540023803711 1644824511
metric=weight field=weight customer_name=Shakey's distributor_name=xx host=$HOSTNAME imei=356611075967160 sensor_dist_name=null serial_number=0004G topic=telegraf-test2  52.34299850463867 1644825774
metric=weight field=weight customer_name=Shakey's distributor_name=xx host=$HOSTNAME imei=356611075967160 sensor_dist_name=null serial_number=0004G topic=telegraf-test2  69.56500244140625 1644820026
metric=weight field=weight customer_name=Shakey's distributor_name=xx host=$HOSTNAME imei=356611075967160 sensor_dist_name=null serial_number=0004G topic=telegraf-test2  69.45500183105469 1644820627
metric=weight field=weight customer_name=Shakey's distributor_name=xxhost=$HOSTNAME imei=356611075967160 sensor_dist_name=null serial_number=0004G topic=telegraf-test2  69.24700164794922 1644821228
metric=weight field=weight customer_name=Shakey's distributor_name=xx host=$HOSTNAME imei=356611075967160 sensor_dist_name=null serial_number=0004G topic=telegraf-test2  69.16100311279297 1644821829
metric=weight field=weight customer_name=Shakey's distributor_name=xx host=$HOSTNAME imei=356611075967160 sensor_dist_name=null serial_number=0004G topic=telegraf-test2  68.97699737548828 1644822430
metric=weight field=weight customer_name=Shakey's distributor_name=xx host=$HOSTNAME imei=356611075967160 sensor_dist_name=null serial_number=0004G topic=telegraf-test2  68.78099822998047 1644823031
metric=weight field=weight customer_name=Shakey's distributor_name=xx host=$HOSTNAME imei=356611075967160 sensor_dist_name=null serial_number=0004G topic=telegraf-test2  68.65799713134766 1644823632
metric=weight field=weight customer_name=Shakey's distributor_name=xx host=$HOSTNAME imei=356611075967160 sensor_dist_name=null serial_number=0004G topic=telegraf-test2  68.55999755859375 1644824233
metric=weight field=weight customer_name=Shakey's distributor_name=xx host=$HOSTNAME imei=356611075967160 sensor_dist_name=null serial_number=0004G topic=telegraf-test2  68.44999694824219 1644824834
metric=weight field=weight customer_name=Shakey's distributor_name=xx host=$HOSTNAME imei=356611075967160 sensor_dist_name=null serial_number=0004G topic=telegraf-test2  68.3759994506836 1644825435
metric=weight field=weight customer_name=Shakey's distributor_name=xx host=$HOSTNAME imei=356611075967160 sensor_dist_name=null serial_number=0004G topic=telegraf-test2  68.25399780273438 1644826036
ksql> select FROM_UNIXTIME(weight_createdat), weight_serialnumber, weight_weight, weight_createdat from telegraf_test where weight_serialnumber = '0004G' and weight_createdat > 1644820000000 emit changes;
+---------------------------+---------------------------+---------------------------+---------------------------+
|KSQL_COL_0                 |WEIGHT_SERIALNUMBER        |WEIGHT_WEIGHT              |WEIGHT_CREATEDAT           |
+---------------------------+---------------------------+---------------------------+---------------------------+
|2022-02-14T06:27:06.857    |0004G                      |69.56500244140625          |1644820026857              |
|2022-02-14T06:37:07.857    |0004G                      |69.45500183105469          |1644820627857              |
|2022-02-14T06:47:08.857    |0004G                      |69.24700164794922          |1644821228857              |
|2022-02-14T06:57:09.857    |0004G                      |69.16100311279297          |1644821829857              |
|2022-02-14T07:07:10.857    |0004G                      |68.97699737548828          |1644822430857              |
|2022-02-14T07:17:11.857    |0004G                      |68.78099822998047          |1644823031857              |
|2022-02-14T07:27:12.857    |0004G                      |68.65799713134766          |1644823632857              |
|2022-02-14T07:37:13.857    |0004G                      |68.55999755859375          |1644824233857              |
|2022-02-14T07:47:14.857    |0004G                      |68.44999694824219          |1644824834857              |
|2022-02-14T07:57:15.857    |0004G                      |68.3759994506836           |1644825435857              |
|2022-02-14T08:07:16.857    |0004G                      |68.25399780273438          |1644826036857              |

Additional info

No response

@kgrimsby kgrimsby added the bug unexpected problem or unintended behavior label Feb 14, 2022
@kgrimsby
Copy link
Contributor Author

Added timestamp column as tag:

metric=weight field=weight created=1.644528693201e+12 customer_name=Shakey's host=$HOSTNAME imei=356611075967160 serial_number=0004G topic=telegraf-test2  52.35499954223633 1644826706
metric=weight field=weight created=1.644820026857e+12 customer_name=Shakey's host=$HOSTNAME imei=356611075967160 serial_number=0004G topic=telegraf-test2  69.56500244140625 1644820026

As you can see the timestamp columns clearly doesn't match on the first line.

@kgrimsby
Copy link
Contributor Author

More debugging:

Added this to parser.go in json_v2

115                p.timestamp = time.Now()
116                if c.TimestampPath != "" {
117                        result := gjson.GetBytes(input, c.TimestampPath)
118                        test, err := strconv.ParseInt(result.String(), 10, 64)
119                        ts := result.Int()
120                        if err != nil {
121                                p.Log.Debugf("Error parsing string ts")
122                        }
123                        //p.Log.Debugf("Timestamp type: %s / %s / %s", result.Type, result.String(), test)
124                        //p.Log.Debugf("Message: %s", input)
125                        if result.Type == gjson.Null {
126                                p.Log.Debugf("Message: %s", input)
127                                return nil, fmt.Errorf(GJSONPathNUllErrorMSG)
128                        }
129                        if !result.IsArray() && !result.IsObject() {
130                                if c.TimestampFormat == "" {
131                                        err := fmt.Errorf("use of 'timestamp_query' requires 'timestamp_format'")
132                                        return nil, err
133                                }
134
135                                var err error
136                                p.timestamp, err = internal.ParseTimestamp(c.TimestampFormat, ts, c.TimestampTimezone)
137                                if err != nil {
138                                        return nil, err
139                                }
140                        }
141
142                        if test != p.timestamp.UnixMilli() || ts != p.timestamp.UnixMilli() {
143                                p.Log.Debugf("%s / %s / %s / %s", p.timestamp, p.timestamp.UnixMilli(), test, ts)
144
145                                p.Log.Debugf("TIMESTAMP DOESNT MATCH!!")
146                                p.Log.Debugf("TIMESTAMP DOESNT MATCH!!")
147                                p.Log.Debugf("TIMESTAMP DOESNT MATCH!!")
148                                p.Log.Debugf("%s", p.timestamp.UnixMilli())
149                                p.Log.Debugf("%s", test)
150                                p.Log.Debugf("%s", ts)
151                                p.Log.Debugf("%s", p.timestamp.UnixMilli() - test)
152                                p.Log.Debugf("%s", p.timestamp.UnixMilli() - ts)
153
154                        }
155                }

Output:

2022-02-15T07:46:17Z D! [parsers.json_v2::kafka_consumer] 2022-01-15 15:21:25.256 +0000 UTC / %!s(int64=1642260085256) / %!s(int64=1641975171000) / %!s(int64=1641975171000)
2022-02-15T07:46:17Z D! [parsers.json_v2::kafka_consumer] TIMESTAMP DOESNT MATCH!!
2022-02-15T07:46:17Z D! [parsers.json_v2::kafka_consumer] TIMESTAMP DOESNT MATCH!!
2022-02-15T07:46:17Z D! [parsers.json_v2::kafka_consumer] TIMESTAMP DOESNT MATCH!!
2022-02-15T07:46:17Z D! [parsers.json_v2::kafka_consumer] %!s(int64=1644911177034)
2022-02-15T07:46:17Z D! [parsers.json_v2::kafka_consumer] %!s(int64=1641975171000)
2022-02-15T07:46:17Z D! [parsers.json_v2::kafka_consumer] %!s(int64=1641975171000)
2022-02-15T07:46:17Z D! [parsers.json_v2::kafka_consumer] %!s(int64=284971882)
2022-02-15T07:46:17Z D! [parsers.json_v2::kafka_consumer] %!s(int64=284971882)
---
2022-02-15T07:48:27Z D! [parsers.json_v2::kafka_consumer] 2022-02-12 17:00:15 +0000 UTC / %!s(int64=1642366748911) / %!s(int64=1642315404036) / %!s(int64=1642315404036)
2022-02-15T07:48:27Z D! [parsers.json_v2::kafka_consumer] TIMESTAMP DOESNT MATCH!!
2022-02-15T07:48:27Z D! [parsers.json_v2::kafka_consumer] TIMESTAMP DOESNT MATCH!!
2022-02-15T07:48:27Z D! [parsers.json_v2::kafka_consumer] TIMESTAMP DOESNT MATCH!!
2022-02-15T07:48:27Z D! [parsers.json_v2::kafka_consumer] %!s(int64=1642366833244)
2022-02-15T07:48:27Z D! [parsers.json_v2::kafka_consumer] %!s(int64=1642315404036)
2022-02-15T07:48:27Z D! [parsers.json_v2::kafka_consumer] %!s(int64=1642315404036)
2022-02-15T07:48:27Z D! [parsers.json_v2::kafka_consumer] %!s(int64=2373416964)
2022-02-15T07:48:27Z D! [parsers.json_v2::kafka_consumer] %!s(int64=2595903061)

@Hipska Hipska added area/json json and json_v2 parser/serialiser related area/json_v2 and removed platform/darwin area/kafka labels Feb 15, 2022
@Hipska
Copy link
Contributor

Hipska commented Feb 15, 2022

You might be facing this same issue: #10606, selecting the timestamp does seem broken in v1.21.3. There is already a PR that I think resolves it: #10618

@kgrimsby
Copy link
Contributor Author

Tested with PR, problem still occurs.

@kgrimsby
Copy link
Contributor Author

This seemed to solve my problem (I'm not using objects):

115                timestamp := time.Now()
116                p.timestamp = timestamp
117                if c.TimestampPath != "" {
118                        result := gjson.GetBytes(input, c.TimestampPath)
119                        test, err := strconv.ParseInt(result.String(), 10, 64)
120                        ts := result.Int()
121
122                        wresult := gjson.GetBytes(input, "weight_weight").Float()
123                        if err != nil {
124                                p.Log.Debugf("Error parsing string ts")
125                        }
126
127                        //p.Log.Debugf("Timestamp type: %s / %s / %s", result.Type, result.String(), test)
128                        //p.Log.Debugf("Message: %s", input)
129                        if result.Type == gjson.Null {
130                                p.Log.Debugf("Message: %s", input)
131                                return nil, fmt.Errorf(GJSONPathNUllErrorMSG)
132                        }
133                        if !result.IsArray() && !result.IsObject() {
134                                if c.TimestampFormat == "" {
135                                        err := fmt.Errorf("use of 'timestamp_query' requires 'timestamp_format'")
136                                        return nil, err
137                                }
138
139                                var err error
140                                timestamp, err = internal.ParseTimestamp(c.TimestampFormat, result.String(), c.TimestampTimezone)
141                                p.timestamp = timestamp
142
143                                if err != nil {
144                                        return nil, err
145                                }
146                        }
----
167                fields, err := p.processMetric(input, c.Fields, false, timestamp)
168                if err != nil {
169                        return nil, err
170                }
171
172                tags, err := p.processMetric(input, c.Tags, true, timestamp)
173                if err != nil {
174                        return nil, err
175                }
----
203func (p *Parser) processMetric(input []byte, data []DataSet, tag bool, timestamp time.Time) ([]telegraf.Metric, error) {
----
233                mNode := MetricNode{
234                        OutputName:  setName,
235                        SetName:     setName,
236                        DesiredType: c.Type,
237                        Tag:         tag,
238                        Metric: metric.New(
239                                p.measurementName,
240                                map[string]string{},
241                                map[string]interface{}{},
242                                timestamp,
243                        ),
244                        Result: result,
245                }

@kgrimsby
Copy link
Contributor Author

Could this be related to the fact that my kafka topic has 3 partitions, and I think sarama spawns a consumer (and therfor parser) for each partition. And what I'm experiencing is somehow that one parser overwrites the p.timestamp in the middle of execution in another?

@Hipska
Copy link
Contributor

Hipska commented Feb 15, 2022

Yeah, it doesn't seem this class variable is needed indeed. You are right it is better a local var.

Great you figured it out..

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/json json and json_v2 parser/serialiser related area/kafka bug unexpected problem or unintended behavior
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants