Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sqlqueryreceiver) add error count metrics for receiver #3

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 44 additions & 3 deletions receiver/sqlqueryreceiver/internal/observability/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,25 @@ import (
func init() {
err := view.Register(
viewAcceptedLogs,
viewErrorCount,
)
if err != nil {
fmt.Printf("Failed to register sqlquery receiver's views: %v\n", err)
}
}

const (
StartError = "start_error"
CollectError = "collect_error"
)

var (
mAcceptedLogs = stats.Int64("receiver/accepted/log/records", "Number of log record pushed into the pipeline.", "")
receiverKey, _ = tag.NewKey("receiver") // nolint:errcheck
queryKey, _ = tag.NewKey("query") // nolint:errcheck
mAcceptedLogs = stats.Int64("receiver/accepted/log/records", "Number of log record pushed into the pipeline.", "")
mErrorCount = stats.Int64("receiver/error/count", "Number of errors", "")

receiverKey, _ = tag.NewKey("receiver") // nolint:errcheck
queryKey, _ = tag.NewKey("query") // nolint:errcheck
errorTypeKey, _ = tag.NewKey("error_type") // nolint:errcheck
)

var viewAcceptedLogs = &view.View{
Expand All @@ -45,6 +54,14 @@ var viewAcceptedLogs = &view.View{
Aggregation: view.Sum(),
}

var viewErrorCount = &view.View{
Name: mErrorCount.Name(),
Description: mErrorCount.Description(),
Measure: mErrorCount,
TagKeys: []tag.Key{receiverKey, queryKey, errorTypeKey},
Aggregation: view.Sum(),
}

func RecordAcceptedLogs(acceptedLogs int64, receiver string, query string) error {
return stats.RecordWithTags(
context.Background(),
Expand All @@ -55,3 +72,27 @@ func RecordAcceptedLogs(acceptedLogs int64, receiver string, query string) error
mAcceptedLogs.M(acceptedLogs),
)
}

func RecordErrorCount(errorType string, receiver string, query string) error {
return stats.RecordWithTags(
context.Background(),
[]tag.Mutator{
tag.Insert(receiverKey, receiver),
tag.Insert(queryKey, query),
tag.Insert(errorTypeKey, errorType),
},
mErrorCount.M(int64(1)),
)
}

func RecordNoErrorCount(errorType string, receiver string, query string) error {
return stats.RecordWithTags(
context.Background(),
[]tag.Mutator{
tag.Insert(receiverKey, receiver),
tag.Insert(queryKey, query),
tag.Insert(errorTypeKey, errorType),
},
mErrorCount.M(int64(0)),
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,20 +86,34 @@ func metricReader(chData chan []*metricdata.Metric, fail chan struct{}, count in
}

func TestMetrics(t *testing.T) {
const (
receiver = "sqlquery/my-name"
queryReceiver = "query-0: select * from simple_logs"
)
type testCase struct {
name string
recordFunc string
value int64
name string
recordFunc string
value int64
labels map[string]string
labelsOrder []string // order of labels in timeseries, label values have the same order as keys in the metric descriptor
}
tests := []testCase{
{
name: "receiver/accepted/log/records",
recordFunc: "RecordAcceptedLogs",
value: 10,
labels: map[string]string{
"receiver": "sqlquery/my-name",
"query": "query-0: select * from simple_logs",
},
labelsOrder: []string{"query", "receiver"},
},
{
name: "receiver/error/count",
recordFunc: "RecordErrorCount",
value: 1,
labels: map[string]string{
"receiver": "sqlquery/my-logs",
"query": "query-1: select * from simple_logs",
"error_type": StartError,
},
labelsOrder: []string{"error_type", "query", "receiver"},
},
}

Expand All @@ -113,7 +127,11 @@ func TestMetrics(t *testing.T) {
for _, tt := range tests {
switch tt.recordFunc {
case "RecordAcceptedLogs":
require.NoError(t, RecordAcceptedLogs(tt.value, receiver, queryReceiver))
require.NoError(t, RecordAcceptedLogs(tt.value, tt.labels["receiver"], tt.labels["query"]))
case "RecordErrorCount":
require.NoError(t, RecordErrorCount(tt.labels["error_type"], tt.labels["receiver"], tt.labels["query"]))
case "RecordNoErrorCount":
require.NoError(t, RecordNoErrorCount(tt.labels["error_type"], tt.labels["receiver"], tt.labels["query"]))
}
}

Expand Down Expand Up @@ -141,11 +159,14 @@ func TestMetrics(t *testing.T) {
require.Len(t, d.TimeSeries, 1)
require.Len(t, d.TimeSeries[0].Points, 1)
assert.Equal(t, d.TimeSeries[0].Points[0].Value, tt.value)
require.Len(t, d.TimeSeries[0].LabelValues, 2)
require.True(t, d.TimeSeries[0].LabelValues[0].Present)
assert.Equal(t, d.TimeSeries[0].LabelValues[0].Value, queryReceiver)
require.True(t, d.TimeSeries[0].LabelValues[1].Present)
assert.Equal(t, d.TimeSeries[0].LabelValues[1].Value, receiver)
require.Len(t, d.TimeSeries[0].LabelValues, len(tt.labels))

for i, label := range d.TimeSeries[0].LabelValues {
val, ok := tt.labels[tt.labelsOrder[i]]
assert.True(t, ok)
assert.True(t, label.Present)
assert.Equal(t, label.Value, val)
}
})
}
}
15 changes: 15 additions & 0 deletions receiver/sqlqueryreceiver/logs_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,17 @@ func (receiver *logsReceiver) Start(ctx context.Context, host component.Host) er
for _, queryReceiver := range receiver.queryReceivers {
err := queryReceiver.start()
if err != nil {
if err := observability.RecordErrorCount(observability.StartError, receiver.id.String(), queryReceiver.ID()); err != nil {
receiver.settings.Logger.Debug("error recording metric for errors count", zap.Error(err))
}
return err
}

if err := observability.RecordNoErrorCount(observability.StartError, receiver.id.String(), queryReceiver.ID()); err != nil {
receiver.settings.Logger.Debug("error recording metric for errors count", zap.Error(err))
}
}

receiver.startCollecting()
receiver.settings.Logger.Debug("started.")
return nil
Expand Down Expand Up @@ -128,6 +136,13 @@ func (receiver *logsReceiver) collect() {
logs, err := queryReceiver.collect(context.Background())
if err != nil {
receiver.settings.Logger.Error("Error collecting logs", zap.Error(err), zap.String("query", queryReceiver.ID()))
if err := observability.RecordErrorCount(observability.CollectError, receiver.id.String(), queryReceiver.ID()); err != nil {
receiver.settings.Logger.Debug("error for recording metric for errors count", zap.Error(err))
}
}

if err := observability.RecordNoErrorCount(observability.CollectError, receiver.id.String(), queryReceiver.ID()); err != nil {
receiver.settings.Logger.Debug("error for recording metric for errors count", zap.Error(err))
}

if err := observability.RecordAcceptedLogs(int64(logs.LogRecordCount()), receiver.id.String(), queryReceiver.id); err != nil {
Expand Down