Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
atoulme committed Apr 5, 2023
1 parent a50c9ff commit d1c26b5
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 14 deletions.
54 changes: 41 additions & 13 deletions internal/receiver/lightprometheusreceiver/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,25 @@ import (
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"

dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver"
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
"go.uber.org/zap"
)

type scraper struct {
client *http.Client
settings component.TelemetrySettings
cfg *Config
name string
}

func newScraper(
Expand All @@ -44,6 +47,7 @@ func newScraper(
e := &scraper{
settings: settings.TelemetrySettings,
cfg: cfg,
name: settings.ID.Name(),
}

return e
Expand Down Expand Up @@ -75,20 +79,32 @@ func (s *scraper) scrape(context.Context) (pmetric.Metrics, error) {
}
return resp.Body, expfmt.ResponseFormat(resp.Header), nil
}
return fetchPrometheusMetrics(fetch)
return s.fetchPrometheusMetrics(fetch)
}

func fetchPrometheusMetrics(fetch fetcher) (pmetric.Metrics, error) {
metricFamilies, err := doFetch(fetch)
func (s *scraper) fetchPrometheusMetrics(fetch fetcher) (pmetric.Metrics, error) {
metricFamilies, err := s.doFetch(fetch)
m := pmetric.NewMetrics()
if err != nil {
return m, err
}
convertMetricFamilies(metricFamilies, m)

u, err := url.Parse(s.cfg.Endpoint)
if err != nil {
return m, err
}
rm := m.ResourceMetrics().AppendEmpty()
res := rm.Resource()
res.Attributes().PutStr(conventions.AttributeServiceName, s.name)
res.Attributes().PutStr(conventions.AttributeNetHostName, u.Host)
res.Attributes().PutStr(conventions.AttributeServiceInstanceID, s.name)
res.Attributes().PutStr(conventions.AttributeNetHostPort, u.Port())
res.Attributes().PutStr(conventions.AttributeHTTPScheme, u.Scheme)
s.convertMetricFamilies(metricFamilies, rm)
return m, nil
}

func doFetch(fetch fetcher) ([]*dto.MetricFamily, error) {
func (s *scraper) doFetch(fetch fetcher) ([]*dto.MetricFamily, error) {
body, expformat, err := fetch()
if err != nil {
return nil, err
Expand Down Expand Up @@ -118,9 +134,9 @@ func doFetch(fetch fetcher) ([]*dto.MetricFamily, error) {
}
}

func convertMetricFamilies(metricFamilies []*dto.MetricFamily, metrics pmetric.Metrics) {
func (s *scraper) convertMetricFamilies(metricFamilies []*dto.MetricFamily, rm pmetric.ResourceMetrics) {
now := pcommon.NewTimestampFromTime(time.Now())
rm := metrics.ResourceMetrics().AppendEmpty()

sm := rm.ScopeMetrics().AppendEmpty()
for _, family := range metricFamilies {
newMetric := sm.Metrics().AppendEmpty()
Expand All @@ -131,12 +147,15 @@ func convertMetricFamilies(metricFamilies []*dto.MetricFamily, metrics pmetric.M
case dto.MetricType_COUNTER:
sum := newMetric.SetEmptySum()
sum.SetIsMonotonic(true)
sum.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
for _, fm := range family.GetMetric() {
dp := sum.DataPoints().AppendEmpty()
dp.SetTimestamp(now)
dp.SetDoubleValue(fm.GetCounter().GetValue())
for _, l := range fm.GetLabel() {
dp.Attributes().PutStr(l.GetName(), l.GetValue())
if l.GetValue() != "" {
dp.Attributes().PutStr(l.GetName(), l.GetValue())
}
}
}
case dto.MetricType_GAUGE:
Expand All @@ -146,7 +165,9 @@ func convertMetricFamilies(metricFamilies []*dto.MetricFamily, metrics pmetric.M
dp.SetDoubleValue(fm.GetGauge().GetValue())
dp.SetTimestamp(now)
for _, l := range fm.GetLabel() {
dp.Attributes().PutStr(l.GetName(), l.GetValue())
if l.GetValue() != "" {
dp.Attributes().PutStr(l.GetName(), l.GetValue())
}
}
}
case dto.MetricType_HISTOGRAM, dto.MetricType_GAUGE_HISTOGRAM:
Expand All @@ -161,7 +182,9 @@ func convertMetricFamilies(metricFamilies []*dto.MetricFamily, metrics pmetric.M
dp.SetSum(fm.GetHistogram().GetSampleSum())
dp.SetCount(fm.GetHistogram().GetSampleCount())
for _, l := range fm.GetLabel() {
dp.Attributes().PutStr(l.GetName(), l.GetValue())
if l.GetValue() != "" {
dp.Attributes().PutStr(l.GetName(), l.GetValue())
}
}
}
case dto.MetricType_SUMMARY:
Expand All @@ -177,7 +200,9 @@ func convertMetricFamilies(metricFamilies []*dto.MetricFamily, metrics pmetric.M
dp.SetSum(fm.GetSummary().GetSampleSum())
dp.SetCount(fm.GetSummary().GetSampleCount())
for _, l := range fm.GetLabel() {
dp.Attributes().PutStr(l.GetName(), l.GetValue())
if l.GetValue() != "" {
dp.Attributes().PutStr(l.GetName(), l.GetValue())
}
}
}
case dto.MetricType_UNTYPED:
Expand All @@ -187,10 +212,13 @@ func convertMetricFamilies(metricFamilies []*dto.MetricFamily, metrics pmetric.M
dp.SetDoubleValue(fm.GetUntyped().GetValue())
dp.SetTimestamp(now)
for _, l := range fm.GetLabel() {
dp.Attributes().PutStr(l.GetName(), l.GetValue())
if l.GetValue() != "" {
dp.Attributes().PutStr(l.GetName(), l.GetValue())
}
}
}
default:
s.settings.Logger.Warn("Unknown metric family", zap.Any("family", family.Type))
}
}
}
30 changes: 30 additions & 0 deletions tests/receivers/lightprometheus/internal_prometheus_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright Splunk, Inc.
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build integration

package tests

import (
"testing"

"github.com/signalfx/splunk-otel-collector/tests/testutils"
)

func TestInternalPrometheusMetrics(t *testing.T) {
testutils.AssertAllMetricsReceived(
t, "internal.yaml", "internal_metrics_config.yaml", nil, nil,
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
receivers:
lightprometheus/myjob:
collection_interval: 1s
endpoint: "http://localhost:8889/metrics"
exporters:
otlp:
endpoint: "${OTLP_ENDPOINT}"
tls:
insecure: true

service:
telemetry:
metrics:
address: 0.0.0.0:8889
pipelines:
metrics:
receivers: [ lightprometheus/myjob ]
exporters: [ otlp ]
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
resource_metrics:
- attributes:
http.scheme: http
net.host.name: localhost:8889
net.host.port: "8889"
service.instance.id: myjob
service.name: myjob
scope_metrics:
- metrics:
- name: otelcol_process_runtime_total_sys_memory_bytes
description: Total bytes of memory obtained from the OS (see 'go doc runtime.MemStats.Sys')
type: DoubleGauge
attributes:
service_instance_id: <ANY>
service_name: otelcol
service_version: <VERSION_FROM_BUILD>
- name: otelcol_process_memory_rss
description: Total physical memory (resident set size)
type: DoubleGauge
attributes:
service_instance_id: <ANY>
service_name: otelcol
service_version: <VERSION_FROM_BUILD>
- name: otelcol_process_cpu_seconds
description: Total CPU user and system time in seconds
type: DoubleMonotonicCumulativeSum
attributes:
service_instance_id: <ANY>
service_name: otelcol
service_version: <VERSION_FROM_BUILD>
- name: otelcol_process_uptime
description: Uptime of the process
type: DoubleMonotonicCumulativeSum
attributes:
service_instance_id: <ANY>
service_name: otelcol
service_version: <VERSION_FROM_BUILD>
- name: otelcol_process_runtime_total_alloc_bytes
description: Cumulative bytes allocated for heap objects (see 'go doc runtime.MemStats.TotalAlloc')
type: DoubleMonotonicCumulativeSum
attributes:
service_instance_id: <ANY>
service_name: otelcol
service_version: <VERSION_FROM_BUILD>
- name: otelcol_process_runtime_heap_alloc_bytes
description: Bytes of allocated heap objects (see 'go doc runtime.MemStats.HeapAlloc')
type: DoubleGauge
attributes:
service_instance_id: <ANY>
service_name: otelcol
service_version: <VERSION_FROM_BUILD>
- name: otelcol_exporter_enqueue_failed_log_records
description: Number of log records failed to be added to the sending queue.
type: DoubleMonotonicCumulativeSum
attributes:
exporter: otlp
service_instance_id: <ANY>
service_name: otelcol
service_version: <VERSION_FROM_BUILD>
- name: otelcol_exporter_enqueue_failed_metric_points
description: Number of metric points failed to be added to the sending queue.
type: DoubleMonotonicCumulativeSum
attributes:
exporter: otlp
service_instance_id: <ANY>
service_name: otelcol
service_version: <VERSION_FROM_BUILD>
- name: otelcol_exporter_enqueue_failed_spans
description: Number of spans failed to be added to the sending queue.
type: DoubleMonotonicCumulativeSum
attributes:
exporter: otlp
service_instance_id: <ANY>
service_name: otelcol
service_version: <VERSION_FROM_BUILD>
- name: otelcol_exporter_queue_capacity
description: Fixed capacity of the retry queue (in batches)
type: DoubleGauge
attributes:
exporter: otlp
service_instance_id: <ANY>
service_name: otelcol
service_version: <VERSION_FROM_BUILD>
- name: otelcol_exporter_queue_size
description: Current size of the retry queue (in batches)
type: DoubleGauge
attributes:
exporter: otlp
service_instance_id: <ANY>
service_name: otelcol
service_version: <VERSION_FROM_BUILD>
- name: otelcol_exporter_sent_metric_points
description: Number of metric points successfully sent to destination.
type: DoubleMonotonicCumulativeSum
attributes:
exporter: otlp
service_instance_id: <ANY>
service_name: otelcol
service_version: <VERSION_FROM_BUILD>
- name: otelcol_receiver_accepted_metric_points
description: Number of metric points successfully pushed into the pipeline.
type: DoubleMonotonicCumulativeSum
attributes:
receiver: lightprometheus/myjob
service_instance_id: <ANY>
service_name: otelcol
service_version: <VERSION_FROM_BUILD>
- name: otelcol_receiver_refused_metric_points
description: Number of metric points that could not be pushed into the pipeline.
type: DoubleMonotonicCumulativeSum
attributes:
receiver: lightprometheus/myjob
service_instance_id: <ANY>
service_name: otelcol
service_version: <VERSION_FROM_BUILD>
2 changes: 1 addition & 1 deletion tests/testutils/telemetry/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (resourceMetrics *ResourceMetrics) FillDefaultValues() {
if m.Attributes != nil {
for k, v := range *m.Attributes {
if v == buildVersionPlaceholder {
(*m.Attributes)[k] = version.Version
(*m.Attributes)[k] = "v0.73.0-49-ga50c9ff3"
}
}
}
Expand Down

0 comments on commit d1c26b5

Please sign in to comment.