Skip to content

Commit

Permalink
feat(sqlqueryreceiver) add error count metrics for receiver
Browse files Browse the repository at this point in the history
  • Loading branch information
kasia-kujawa committed May 12, 2023
1 parent f055733 commit 0821dcc
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 16 deletions.
47 changes: 44 additions & 3 deletions receiver/sqlqueryreceiver/internal/observability/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,25 @@ import (
func init() {
err := view.Register(
viewAcceptedLogs,
viewErrorCount,
)
if err != nil {
fmt.Printf("Failed to register sqlquery receiver's views: %v\n", err)
}
}

const (
StartError = "start_error"
CollectError = "collect_error"
)

var (
mAcceptedLogs = stats.Int64("receiver/accepted/log/records", "Number of log record pushed into the pipeline.", "")
receiverKey, _ = tag.NewKey("receiver") // nolint:errcheck
queryKey, _ = tag.NewKey("query") // nolint:errcheck
mAcceptedLogs = stats.Int64("receiver/accepted/log/records", "Number of log record pushed into the pipeline.", "")
mErrorCount = stats.Int64("receiver/error/count", "Number of errors", "")

receiverKey, _ = tag.NewKey("receiver") // nolint:errcheck
queryKey, _ = tag.NewKey("query") // nolint:errcheck
errorTypeKey, _ = tag.NewKey("error_type") // nolint:errcheck
)

var viewAcceptedLogs = &view.View{
Expand All @@ -45,6 +54,14 @@ var viewAcceptedLogs = &view.View{
Aggregation: view.Sum(),
}

var viewErrorCount = &view.View{
Name: mErrorCount.Name(),
Description: mErrorCount.Description(),
Measure: mErrorCount,
TagKeys: []tag.Key{receiverKey, queryKey, errorTypeKey},
Aggregation: view.Sum(),
}

func RecordAcceptedLogs(acceptedLogs int64, receiver string, query string) error {
return stats.RecordWithTags(
context.Background(),
Expand All @@ -55,3 +72,27 @@ func RecordAcceptedLogs(acceptedLogs int64, receiver string, query string) error
mAcceptedLogs.M(acceptedLogs),
)
}

func RecordErrorCount(errorType string, receiver string, query string) error {
return stats.RecordWithTags(
context.Background(),
[]tag.Mutator{
tag.Insert(receiverKey, receiver),
tag.Insert(queryKey, query),
tag.Insert(errorTypeKey, errorType),
},
mErrorCount.M(int64(1)),
)
}

func RecordNoErrorCount(errorType string, receiver string, query string) error {
return stats.RecordWithTags(
context.Background(),
[]tag.Mutator{
tag.Insert(receiverKey, receiver),
tag.Insert(queryKey, query),
tag.Insert(errorTypeKey, errorType),
},
mErrorCount.M(int64(0)),
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,20 +86,34 @@ func metricReader(chData chan []*metricdata.Metric, fail chan struct{}, count in
}

func TestMetrics(t *testing.T) {
const (
receiver = "sqlquery/my-name"
queryReceiver = "query-0: select * from simple_logs"
)
type testCase struct {
name string
recordFunc string
value int64
name string
recordFunc string
value int64
labels map[string]string
labelsOrder []string // order of labels in timeseries, label values have the same order as keys in the metric descriptor
}
tests := []testCase{
{
name: "receiver/accepted/log/records",
recordFunc: "RecordAcceptedLogs",
value: 10,
labels: map[string]string{
"receiver": "sqlquery/my-name",
"query": "query-0: select * from simple_logs",
},
labelsOrder: []string{"query", "receiver"},
},
{
name: "receiver/error/count",
recordFunc: "RecordErrorCount",
value: 1,
labels: map[string]string{
"receiver": "sqlquery/my-logs",
"query": "query-1: select * from simple_logs",
"error_type": StartError,
},
labelsOrder: []string{"error_type", "query", "receiver"},
},
}

Expand All @@ -113,7 +127,11 @@ func TestMetrics(t *testing.T) {
for _, tt := range tests {
switch tt.recordFunc {
case "RecordAcceptedLogs":
require.NoError(t, RecordAcceptedLogs(tt.value, receiver, queryReceiver))
require.NoError(t, RecordAcceptedLogs(tt.value, tt.labels["receiver"], tt.labels["query"]))
case "RecordErrorCount":
require.NoError(t, RecordErrorCount(tt.labels["error_type"], tt.labels["receiver"], tt.labels["query"]))
case "RecordNoErrorCount":
require.NoError(t, RecordNoErrorCount(tt.labels["error_type"], tt.labels["receiver"], tt.labels["query"]))
}
}

Expand Down Expand Up @@ -141,11 +159,14 @@ func TestMetrics(t *testing.T) {
require.Len(t, d.TimeSeries, 1)
require.Len(t, d.TimeSeries[0].Points, 1)
assert.Equal(t, d.TimeSeries[0].Points[0].Value, tt.value)
require.Len(t, d.TimeSeries[0].LabelValues, 2)
require.True(t, d.TimeSeries[0].LabelValues[0].Present)
assert.Equal(t, d.TimeSeries[0].LabelValues[0].Value, queryReceiver)
require.True(t, d.TimeSeries[0].LabelValues[1].Present)
assert.Equal(t, d.TimeSeries[0].LabelValues[1].Value, receiver)
require.Len(t, d.TimeSeries[0].LabelValues, len(tt.labels))

for i, label := range d.TimeSeries[0].LabelValues {
val, ok := tt.labels[tt.labelsOrder[i]]
assert.True(t, ok)
assert.True(t, label.Present)
assert.Equal(t, label.Value, val)
}
})
}
}
15 changes: 15 additions & 0 deletions receiver/sqlqueryreceiver/logs_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,17 @@ func (receiver *logsReceiver) Start(ctx context.Context, host component.Host) er
for _, queryReceiver := range receiver.queryReceivers {
err := queryReceiver.start()
if err != nil {
if err := observability.RecordErrorCount(observability.StartError, receiver.id.String(), queryReceiver.ID()); err != nil {
receiver.settings.Logger.Debug("error recording metric for errors count", zap.Error(err))
}
return err
}

if err := observability.RecordNoErrorCount(observability.StartError, receiver.id.String(), queryReceiver.ID()); err != nil {
receiver.settings.Logger.Debug("error recording metric for errors count", zap.Error(err))
}
}

receiver.startCollecting()
receiver.settings.Logger.Debug("started.")
return nil
Expand Down Expand Up @@ -128,6 +136,13 @@ func (receiver *logsReceiver) collect() {
logs, err := queryReceiver.collect(context.Background())
if err != nil {
receiver.settings.Logger.Error("Error collecting logs", zap.Error(err), zap.String("query", queryReceiver.ID()))
if err := observability.RecordErrorCount(observability.CollectError, receiver.id.String(), queryReceiver.ID()); err != nil {
receiver.settings.Logger.Debug("error for recording metric for errors count", zap.Error(err))
}
}

if err := observability.RecordNoErrorCount(observability.CollectError, receiver.id.String(), queryReceiver.ID()); err != nil {
receiver.settings.Logger.Debug("error for recording metric for errors count", zap.Error(err))
}

if err := observability.RecordAcceptedLogs(int64(logs.LogRecordCount()), receiver.id.String(), queryReceiver.id); err != nil {
Expand Down

0 comments on commit 0821dcc

Please sign in to comment.