Skip to content

Commit

Permalink
fix: add producer byte rate
Browse files Browse the repository at this point in the history
  • Loading branch information
shivanshuraj1333 committed Dec 3, 2024
1 parent 6288080 commit d016e49
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 12 deletions.
22 changes: 13 additions & 9 deletions pkg/query-service/app/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3141,14 +3141,14 @@ func (aH *APIHandler) getProducerThroughputOverview(
Hash: make(map[string]struct{}),
}

queryRangeParams, err := mq.BuildQRParamsWithCache(messagingQueue, "producer-throughput-overview", attributeCache)
producerQueryRangeParams, err := mq.BuildQRParamsWithCache(messagingQueue, "producer-throughput-overview", attributeCache)
if err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
return
}

if err := validateQueryRangeParamsV3(queryRangeParams); err != nil {
if err := validateQueryRangeParamsV3(producerQueryRangeParams); err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
return
Expand All @@ -3157,7 +3157,7 @@ func (aH *APIHandler) getProducerThroughputOverview(
var result []*v3.Result
var errQuriesByName map[string]error

result, errQuriesByName, err = aH.querierV2.QueryRange(r.Context(), queryRangeParams)
result, errQuriesByName, err = aH.querierV2.QueryRange(r.Context(), producerQueryRangeParams)
if err != nil {
apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err}
RespondError(w, apiErrObj, errQuriesByName)
Expand All @@ -3179,7 +3179,7 @@ func (aH *APIHandler) getProducerThroughputOverview(
}
}

queryRangeParams, err = mq.BuildQRParamsWithCache(messagingQueue, "producer-throughput-overview-latency", attributeCache)
queryRangeParams, err := mq.BuildQRParamsWithCache(messagingQueue, "producer-throughput-overview-byte-rate", attributeCache)
if err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
Expand All @@ -3198,13 +3198,13 @@ func (aH *APIHandler) getProducerThroughputOverview(
return
}

latencyColumn := &v3.Result{QueryName: "latency"}
latencyColumn := &v3.Result{QueryName: "byte_rate"}
var latencySeries []*v3.Series
for _, res := range resultFetchLatency {
for _, series := range res.Series {
topic, topicOk := series.Labels["topic"]
serviceName, serviceNameOk := series.Labels["service_name"]
params := []string{topic, serviceName}
params := []string{serviceName, topic}
hashKey := uniqueIdentifier(params, "#")
_, ok := attributeCache.Hash[hashKey]
if topicOk && serviceNameOk && ok {
Expand All @@ -3214,12 +3214,16 @@ func (aH *APIHandler) getProducerThroughputOverview(
}

latencyColumn.Series = latencySeries
result = append(result, latencyColumn)
var latencyColumnResult []*v3.Result
latencyColumnResult = append(latencyColumnResult, latencyColumn)

resultFetchLatency = postprocess.TransformToTableForBuilderQueries(result, queryRangeParams)
resultFetchLatency = postprocess.TransformToTableForBuilderQueries(latencyColumnResult, queryRangeParams)

result = postprocess.TransformToTableForClickHouseQueries(result)

result = append(result, resultFetchLatency[0])
resp := v3.QueryRangeResponse{
Result: resultFetchLatency,
Result: result,
}
aH.Respond(w, resp)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,17 @@ func buildClickHouseQueryNetwork(messagingQueue *MessagingQueue, queueType strin

func buildBuilderQueriesProducerBytes(unixMilliStart, unixMilliEnd int64, attributeCache *Clients) (map[string]*v3.BuilderQuery, error) {
bq := make(map[string]*v3.BuilderQuery)
queryName := fmt.Sprintf("latency")
queryName := fmt.Sprintf("byte_rate")

chq := &v3.BuilderQuery{
QueryName: queryName,
StepInterval: common.MinAllowedStepInterval(unixMilliStart, unixMilliEnd),
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: "kafka_producer_byte_rate",
Key: "kafka_producer_byte_rate",
DataType: v3.AttributeKeyDataTypeFloat64,
Type: v3.AttributeKeyType("Gauge"),
IsColumn: true,
},
AggregateOperator: v3.AggregateOperatorAvg,
Temporality: v3.Unspecified,
Expand Down Expand Up @@ -276,7 +279,7 @@ func BuildQRParamsWithCache(messagingQueue *MessagingQueue, queryContext string,
cq, err = buildCompositeQuery(&v3.ClickHouseQuery{
Query: query,
}, queryContext)
} else if queryContext == "producer-throughput-overview-latency" {
} else if queryContext == "producer-throughput-overview-byte-rate" {
bhq, err := buildBuilderQueriesProducerBytes(unixMilliStart, unixMilliEnd, attributeCache)
if err != nil {
return nil, err
Expand All @@ -285,6 +288,7 @@ func BuildQRParamsWithCache(messagingQueue *MessagingQueue, queryContext string,
QueryType: v3.QueryTypeBuilder,
BuilderQueries: bhq,
PanelType: v3.PanelTypeTable,
FillGaps: false,
}
}

Expand Down

0 comments on commit d016e49

Please sign in to comment.