Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[lightprometheusreceiver] initial working draft #2921

Merged
merged 8 commits into from
Apr 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/components.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ The distribution offers support for the following components.
| [kafkametrics](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/receiver/kafkametricsreceiver) | [beta] |
| [kafka](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/receiver/kafkareceiver) | [beta] |
| [kubeletstats](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/receiver/kubeletstatsreceiver) | [beta] |
| [lightprometheus](../internal/receiver/lightprometheusreceiver) | [in development] |
| [mongodbatlas](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/receiver/mongodbatlasreceiver) | [beta] |
| [oracledb](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/receiver/oracledbreceiver) | [alpha] |
| [otlp](https://github.com/open-telemetry/opentelemetry-collector/tree/main/receiver/otlpreceiver) | [stable] |
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/windowseventlogreceiver v0.75.0
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/windowsperfcountersreceiver v0.75.0
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zipkinreceiver v0.75.0
github.com/prometheus/client_model v0.3.0
github.com/prometheus/common v0.42.0
github.com/signalfx/golib/v3 v3.3.49
github.com/signalfx/signalfx-agent v1.0.1-0.20230222185249-54e5d1064c5b
github.com/signalfx/splunk-otel-collector/pkg/extension/smartagentextension v0.72.0
Expand Down Expand Up @@ -137,8 +139,6 @@ require (
github.com/ovh/go-ovh v1.3.0 // indirect
github.com/panta/machineid v1.0.2 // indirect
github.com/prometheus/client_golang v1.14.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/common/sigv4 v0.1.0 // indirect
github.com/prometheus/procfs v0.9.0 // indirect
github.com/prometheus/prometheus v0.43.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions internal/components/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ import (
"github.com/signalfx/splunk-otel-collector/internal/exporter/pulsarexporter"
"github.com/signalfx/splunk-otel-collector/internal/receiver/databricksreceiver"
"github.com/signalfx/splunk-otel-collector/internal/receiver/discoveryreceiver"
"github.com/signalfx/splunk-otel-collector/internal/receiver/lightprometheusreceiver"
"github.com/signalfx/splunk-otel-collector/pkg/extension/smartagentextension"
"github.com/signalfx/splunk-otel-collector/pkg/processor/timestampprocessor"
"github.com/signalfx/splunk-otel-collector/pkg/receiver/smartagentreceiver"
Expand Down Expand Up @@ -128,6 +129,7 @@ func Get() (otelcol.Factories, error) {
cloudfoundryreceiver.NewFactory(),
collectdreceiver.NewFactory(),
databricksreceiver.NewFactory(),
lightprometheusreceiver.NewFactory(),
discoveryreceiver.NewFactory(),
fluentforwardreceiver.NewFactory(),
filelogreceiver.NewFactory(),
Expand Down
1 change: 1 addition & 0 deletions internal/components/components_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func TestDefaultComponents(t *testing.T) {
"kafkametrics",
"kubeletstats",
"mongodbatlas",
"lightprometheus",
"oracledb",
"otlp",
"postgresql",
Expand Down
39 changes: 39 additions & 0 deletions internal/receiver/lightprometheusreceiver/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package lightprometheusreceiver

import (
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/receiver/scraperhelper"
)

func createDefaultConfig() component.Config {
scs := scraperhelper.NewDefaultScraperControllerSettings(typeStr)
// set the default collection interval to 30 seconds which is half of the
// lowest job frequency of 1 minute
scs.CollectionInterval = time.Second * 30
return &Config{
ScraperControllerSettings: scs,
HTTPClientSettings: confighttp.NewDefaultHTTPClientSettings(),
}
}

type Config struct {
confighttp.HTTPClientSettings `mapstructure:",squash"`
scraperhelper.ScraperControllerSettings `mapstructure:",squash"`
}
57 changes: 57 additions & 0 deletions internal/receiver/lightprometheusreceiver/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package lightprometheusreceiver

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/scraperhelper"
)

const typeStr = "lightprometheus"

func NewFactory() receiver.Factory {
return receiver.NewFactory(
typeStr,
createDefaultConfig,
receiver.WithMetrics(createMetricsReceiver, component.StabilityLevelDevelopment),
)
}

// createMetricsReceiver creates a metrics receiver for scraping Prometheus metrics.
func createMetricsReceiver(
_ context.Context,
params receiver.CreateSettings,
rConf component.Config,
consumer consumer.Metrics,
) (receiver.Metrics, error) {
c, _ := rConf.(*Config)
s := newScraper(params, c)

scraper, err := scraperhelper.NewScraper(typeStr, s.scrape, scraperhelper.WithStart(s.start))
if err != nil {
return nil, err
}

return scraperhelper.NewScraperControllerReceiver(
&c.ScraperControllerSettings,
params,
consumer,
scraperhelper.AddScraper(scraper),
)
}
223 changes: 223 additions & 0 deletions internal/receiver/lightprometheusreceiver/scraper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package lightprometheusreceiver

import (
"context"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"

dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver"
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
"go.uber.org/zap"
)

type scraper struct {
client *http.Client
settings component.TelemetrySettings
cfg *Config
name string
}

func newScraper(
settings receiver.CreateSettings,
cfg *Config,
) *scraper {
e := &scraper{
settings: settings.TelemetrySettings,
cfg: cfg,
name: settings.ID.Name(),
}

return e
}

func (s *scraper) start(_ context.Context, host component.Host) error {
var err error
s.client, err = s.cfg.ToClient(host, s.settings)
return err
}

type fetcher func() (io.ReadCloser, expfmt.Format, error)

func (s *scraper) scrape(context.Context) (pmetric.Metrics, error) {
fetch := func() (io.ReadCloser, expfmt.Format, error) {
req, err := http.NewRequest("GET", s.cfg.Endpoint, nil)
if err != nil {
return nil, expfmt.FmtUnknown, err
}

resp, err := s.client.Do(req)
if err != nil {
return nil, expfmt.FmtUnknown, err
}

if resp.StatusCode != 200 {
body, _ := io.ReadAll(resp.Body)
return nil, expfmt.FmtUnknown, fmt.Errorf("light prometheus %s returned status %d: %s", s.cfg.Endpoint, resp.StatusCode, string(body))
}
return resp.Body, expfmt.ResponseFormat(resp.Header), nil
}
return s.fetchPrometheusMetrics(fetch)
}

func (s *scraper) fetchPrometheusMetrics(fetch fetcher) (pmetric.Metrics, error) {
metricFamilies, err := s.doFetch(fetch)
m := pmetric.NewMetrics()
if err != nil {
return m, err
}

u, err := url.Parse(s.cfg.Endpoint)
if err != nil {
return m, err
}
rm := m.ResourceMetrics().AppendEmpty()
res := rm.Resource()
res.Attributes().PutStr(conventions.AttributeServiceName, s.name)
res.Attributes().PutStr(conventions.AttributeNetHostName, u.Host)
res.Attributes().PutStr(conventions.AttributeServiceInstanceID, s.name)
res.Attributes().PutStr(conventions.AttributeNetHostPort, u.Port())
res.Attributes().PutStr(conventions.AttributeHTTPScheme, u.Scheme)
s.convertMetricFamilies(metricFamilies, rm)
return m, nil
}

func (s *scraper) doFetch(fetch fetcher) ([]*dto.MetricFamily, error) {
body, expformat, err := fetch()
if err != nil {
return nil, err
}
defer body.Close()
var decoder expfmt.Decoder
// some "text" responses are missing \n from the last line
if expformat != expfmt.FmtProtoDelim {
decoder = expfmt.NewDecoder(io.MultiReader(body, strings.NewReader("\n")), expformat)
} else {
decoder = expfmt.NewDecoder(body, expformat)
}

var mfs []*dto.MetricFamily

for {
var mf dto.MetricFamily
err := decoder.Decode(&mf)

if err == io.EOF {
return mfs, nil
} else if err != nil {
return nil, err
}

mfs = append(mfs, &mf)
}
}

func (s *scraper) convertMetricFamilies(metricFamilies []*dto.MetricFamily, rm pmetric.ResourceMetrics) {
now := pcommon.NewTimestampFromTime(time.Now())

sm := rm.ScopeMetrics().AppendEmpty()
for _, family := range metricFamilies {
newMetric := sm.Metrics().AppendEmpty()
newMetric.SetName(family.GetName())
newMetric.SetDescription(family.GetHelp())
switch *family.Type {
case dto.MetricType_COUNTER:
sum := newMetric.SetEmptySum()
sum.SetIsMonotonic(true)
sum.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
for _, fm := range family.GetMetric() {
dp := sum.DataPoints().AppendEmpty()
dp.SetTimestamp(now)
dp.SetDoubleValue(fm.GetCounter().GetValue())
for _, l := range fm.GetLabel() {
if l.GetValue() != "" {
dp.Attributes().PutStr(l.GetName(), l.GetValue())
}
}
}
case dto.MetricType_GAUGE:
gauge := newMetric.SetEmptyGauge()
for _, fm := range family.Metric {
dp := gauge.DataPoints().AppendEmpty()
dp.SetDoubleValue(fm.GetGauge().GetValue())
dp.SetTimestamp(now)
for _, l := range fm.GetLabel() {
if l.GetValue() != "" {
dp.Attributes().PutStr(l.GetName(), l.GetValue())
}
}
}
case dto.MetricType_HISTOGRAM, dto.MetricType_GAUGE_HISTOGRAM:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would we consider adding an option to this receiver to drop certain type of prom metrics?
I have a use case where I want to collect prom metrics from some k8s components but am not really interested in the histogram metrics because the buckets are generated dynamically and w/o native UI support for histogram in O11y, visualizing these is not a good experience. So I would just like to not collect them instead of adding exclusion rules for specific metrics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, yes we can and I think we should. @atoulme wdyt?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The usual guidance is to drop the metrics later in a processor filter. I think the signalfx exporter also translates histograms to a different metric type.
We can probably filter at the source though, while we're in here.

Copy link
Contributor Author

@dloucasfx dloucasfx Apr 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, filtering at the source will save us a few bytes... anything we can do to save one bit of memory at a time :-)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's do it in a separate PR. Less is more.

Copy link
Contributor

@atoulme atoulme Apr 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OTL-2009 is filed to follow up.

histogram := newMetric.SetEmptyHistogram()
for _, fm := range family.Metric {
dp := histogram.DataPoints().AppendEmpty()
dp.SetTimestamp(now)
for _, b := range fm.GetHistogram().GetBucket() {
dp.BucketCounts().Append(b.GetCumulativeCount())
dp.ExplicitBounds().Append(b.GetUpperBound())
}
dp.SetSum(fm.GetHistogram().GetSampleSum())
dp.SetCount(fm.GetHistogram().GetSampleCount())
for _, l := range fm.GetLabel() {
if l.GetValue() != "" {
dp.Attributes().PutStr(l.GetName(), l.GetValue())
}
}
}
case dto.MetricType_SUMMARY:
sum := newMetric.SetEmptySummary()
for _, fm := range family.Metric {
dp := sum.DataPoints().AppendEmpty()
dp.SetTimestamp(now)
for _, q := range fm.GetSummary().GetQuantile() {
newQ := dp.QuantileValues().AppendEmpty()
newQ.SetValue(q.GetValue())
newQ.SetQuantile(q.GetQuantile())
}
dp.SetSum(fm.GetSummary().GetSampleSum())
dp.SetCount(fm.GetSummary().GetSampleCount())
for _, l := range fm.GetLabel() {
if l.GetValue() != "" {
dp.Attributes().PutStr(l.GetName(), l.GetValue())
}
}
}
case dto.MetricType_UNTYPED:
gauge := newMetric.SetEmptyGauge()
for _, fm := range family.Metric {
dp := gauge.DataPoints().AppendEmpty()
dp.SetDoubleValue(fm.GetUntyped().GetValue())
dp.SetTimestamp(now)
for _, l := range fm.GetLabel() {
if l.GetValue() != "" {
dp.Attributes().PutStr(l.GetName(), l.GetValue())
}
}
}
default:
s.settings.Logger.Warn("Unknown metric family", zap.Any("family", family.Type))
}
}
}
Loading