From 1c71aa6d44181ae39dc00f444d51fa04d03d59fd Mon Sep 17 00:00:00 2001 From: bas smit Date: Fri, 11 Oct 2024 11:36:45 +0200 Subject: [PATCH] node stats: support infinity These values are often float64 but sometimes a string, from the api docs[1]: > An "Infinity" value for a given flow window indicates that worker > millis have been spent without any events completing processing This adds a custom float64 type to enable the use of a custom marshaller. [1]: https://www.elastic.co/guide/en/logstash/current/node-stats-api.html --- fetcher/responses/nodestats_response.go | 63 ++++++++++++++++++------- 1 file changed, 47 insertions(+), 16 deletions(-) diff --git a/fetcher/responses/nodestats_response.go b/fetcher/responses/nodestats_response.go index 258586c..64a6070 100644 --- a/fetcher/responses/nodestats_response.go +++ b/fetcher/responses/nodestats_response.go @@ -1,6 +1,11 @@ package responses -import "time" +import ( + "encoding/json" + "fmt" + "math" + "time" +) type PipelineResponse struct { Workers int `json:"workers"` @@ -87,28 +92,28 @@ type EventsResponse struct { type FlowResponse struct { InputThroughput struct { - Current float64 `json:"current"` - Lifetime float64 `json:"lifetime"` + Current InfinityFloat `json:"current"` + Lifetime InfinityFloat `json:"lifetime"` } `json:"input_throughput"` FilterThroughput struct { - Current float64 `json:"current"` - Lifetime float64 `json:"lifetime"` + Current InfinityFloat `json:"current"` + Lifetime InfinityFloat `json:"lifetime"` } `json:"filter_throughput"` OutputThroughput struct { - Current float64 `json:"current"` - Lifetime float64 `json:"lifetime"` + Current InfinityFloat `json:"current"` + Lifetime InfinityFloat `json:"lifetime"` } `json:"output_throughput"` QueueBackpressure struct { - Current float64 `json:"current"` - Lifetime float64 `json:"lifetime"` + Current InfinityFloat `json:"current"` + Lifetime InfinityFloat `json:"lifetime"` } `json:"queue_backpressure"` WorkerConcurrency struct { - Current float64 `json:"current"` - Lifetime float64 `json:"lifetime"` + Current InfinityFloat `json:"current"` + Lifetime InfinityFloat `json:"lifetime"` } `json:"worker_concurrency"` WorkerUtilization struct { - Current float64 `json:"current"` - Lifetime float64 `json:"lifetime"` + Current InfinityFloat `json:"current"` + Lifetime InfinityFloat `json:"lifetime"` } `json:"worker_utilization"` } @@ -154,11 +159,11 @@ type SinglePipelineResponse struct { } `json:"events"` Flow struct { WorkerUtilization struct { - Current float64 `json:"current"` - Lifetime float64 `json:"lifetime"` + Current InfinityFloat `json:"current"` + Lifetime InfinityFloat `json:"lifetime"` } `json:"worker_utilization"` WorkerMillisPerEvent struct { - Lifetime float64 `json:"lifetime"` + Lifetime InfinityFloat `json:"lifetime"` } `json:"worker_millis_per_event"` } `json:"flow"` } `json:"filters"` @@ -286,3 +291,29 @@ type NodeStatsResponse struct { Pipelines map[string]SinglePipelineResponse `json:"pipelines"` } + +// InfinityFloat is a float type that also accepts the string Infinity +type InfinityFloat float64 + +func (i *InfinityFloat) UnmarshalJSON(data []byte) error { + var s string + err := json.Unmarshal(data, &s) + if err == nil { + if s == "Infinity" { + *i = InfinityFloat(math.Inf(1)) + return nil + } else if s == "-Infinity" { + *i = InfinityFloat(math.Inf(-1)) + return nil + } + fmt.Errorf("Invalid string value for InfinityFloat: %s", s) + } + + var f float64 + if err := json.Unmarshal(data, &f); err != nil { + return err + } + + *i = InfinityFloat(f) + return nil +}