Skip to content

Commit

Permalink
[lightprometheusreceiver] initial working draft (#2921)
Browse files Browse the repository at this point in the history
* [lightprometheusreceiver] initial working draft

Signed-off-by: Dani Louca <[email protected]>

* wip

* add to components list

* wip

* fix test

* fix import

* fix imports order

* remove comment

---------

Signed-off-by: Dani Louca <[email protected]>
Co-authored-by: Antoine Toulme <[email protected]>
  • Loading branch information
dloucasfx and atoulme authored Apr 6, 2023
1 parent c69d4b5 commit 790e5dc
Show file tree
Hide file tree
Showing 12 changed files with 1,474 additions and 2 deletions.
1 change: 1 addition & 0 deletions docs/components.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ The distribution offers support for the following components.
| [kafkametrics](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/receiver/kafkametricsreceiver) | [beta] |
| [kafka](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/receiver/kafkareceiver) | [beta] |
| [kubeletstats](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/receiver/kubeletstatsreceiver) | [beta] |
| [lightprometheus](../internal/receiver/lightprometheusreceiver) | [in development] |
| [mongodbatlas](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/receiver/mongodbatlasreceiver) | [beta] |
| [oracledb](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/receiver/oracledbreceiver) | [alpha] |
| [otlp](https://github.com/open-telemetry/opentelemetry-collector/tree/main/receiver/otlpreceiver) | [stable] |
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/windowseventlogreceiver v0.75.0
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/windowsperfcountersreceiver v0.75.0
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zipkinreceiver v0.75.0
github.com/prometheus/client_model v0.3.0
github.com/prometheus/common v0.42.0
github.com/signalfx/golib/v3 v3.3.49
github.com/signalfx/signalfx-agent v1.0.1-0.20230222185249-54e5d1064c5b
github.com/signalfx/splunk-otel-collector/pkg/extension/smartagentextension v0.72.0
Expand Down Expand Up @@ -137,8 +139,6 @@ require (
github.com/ovh/go-ovh v1.3.0 // indirect
github.com/panta/machineid v1.0.2 // indirect
github.com/prometheus/client_golang v1.14.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/common/sigv4 v0.1.0 // indirect
github.com/prometheus/procfs v0.9.0 // indirect
github.com/prometheus/prometheus v0.43.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions internal/components/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ import (
"github.com/signalfx/splunk-otel-collector/internal/exporter/pulsarexporter"
"github.com/signalfx/splunk-otel-collector/internal/receiver/databricksreceiver"
"github.com/signalfx/splunk-otel-collector/internal/receiver/discoveryreceiver"
"github.com/signalfx/splunk-otel-collector/internal/receiver/lightprometheusreceiver"
"github.com/signalfx/splunk-otel-collector/pkg/extension/smartagentextension"
"github.com/signalfx/splunk-otel-collector/pkg/processor/timestampprocessor"
"github.com/signalfx/splunk-otel-collector/pkg/receiver/smartagentreceiver"
Expand Down Expand Up @@ -128,6 +129,7 @@ func Get() (otelcol.Factories, error) {
cloudfoundryreceiver.NewFactory(),
collectdreceiver.NewFactory(),
databricksreceiver.NewFactory(),
lightprometheusreceiver.NewFactory(),
discoveryreceiver.NewFactory(),
fluentforwardreceiver.NewFactory(),
filelogreceiver.NewFactory(),
Expand Down
1 change: 1 addition & 0 deletions internal/components/components_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func TestDefaultComponents(t *testing.T) {
"kafkametrics",
"kubeletstats",
"mongodbatlas",
"lightprometheus",
"oracledb",
"otlp",
"postgresql",
Expand Down
39 changes: 39 additions & 0 deletions internal/receiver/lightprometheusreceiver/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package lightprometheusreceiver

import (
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/receiver/scraperhelper"
)

func createDefaultConfig() component.Config {
scs := scraperhelper.NewDefaultScraperControllerSettings(typeStr)
// set the default collection interval to 30 seconds which is half of the
// lowest job frequency of 1 minute
scs.CollectionInterval = time.Second * 30
return &Config{
ScraperControllerSettings: scs,
HTTPClientSettings: confighttp.NewDefaultHTTPClientSettings(),
}
}

type Config struct {
confighttp.HTTPClientSettings `mapstructure:",squash"`
scraperhelper.ScraperControllerSettings `mapstructure:",squash"`
}
57 changes: 57 additions & 0 deletions internal/receiver/lightprometheusreceiver/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package lightprometheusreceiver

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/scraperhelper"
)

const typeStr = "lightprometheus"

func NewFactory() receiver.Factory {
return receiver.NewFactory(
typeStr,
createDefaultConfig,
receiver.WithMetrics(createMetricsReceiver, component.StabilityLevelDevelopment),
)
}

// createMetricsReceiver creates a metrics receiver for scraping Prometheus metrics.
func createMetricsReceiver(
_ context.Context,
params receiver.CreateSettings,
rConf component.Config,
consumer consumer.Metrics,
) (receiver.Metrics, error) {
c, _ := rConf.(*Config)
s := newScraper(params, c)

scraper, err := scraperhelper.NewScraper(typeStr, s.scrape, scraperhelper.WithStart(s.start))
if err != nil {
return nil, err
}

return scraperhelper.NewScraperControllerReceiver(
&c.ScraperControllerSettings,
params,
consumer,
scraperhelper.AddScraper(scraper),
)
}
223 changes: 223 additions & 0 deletions internal/receiver/lightprometheusreceiver/scraper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package lightprometheusreceiver

import (
"context"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"

dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver"
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
"go.uber.org/zap"
)

type scraper struct {
client *http.Client
settings component.TelemetrySettings
cfg *Config
name string
}

func newScraper(
settings receiver.CreateSettings,
cfg *Config,
) *scraper {
e := &scraper{
settings: settings.TelemetrySettings,
cfg: cfg,
name: settings.ID.Name(),
}

return e
}

func (s *scraper) start(_ context.Context, host component.Host) error {
var err error
s.client, err = s.cfg.ToClient(host, s.settings)
return err
}

type fetcher func() (io.ReadCloser, expfmt.Format, error)

func (s *scraper) scrape(context.Context) (pmetric.Metrics, error) {
fetch := func() (io.ReadCloser, expfmt.Format, error) {
req, err := http.NewRequest("GET", s.cfg.Endpoint, nil)
if err != nil {
return nil, expfmt.FmtUnknown, err
}

resp, err := s.client.Do(req)
if err != nil {
return nil, expfmt.FmtUnknown, err
}

if resp.StatusCode != 200 {
body, _ := io.ReadAll(resp.Body)
return nil, expfmt.FmtUnknown, fmt.Errorf("light prometheus %s returned status %d: %s", s.cfg.Endpoint, resp.StatusCode, string(body))
}
return resp.Body, expfmt.ResponseFormat(resp.Header), nil
}
return s.fetchPrometheusMetrics(fetch)
}

func (s *scraper) fetchPrometheusMetrics(fetch fetcher) (pmetric.Metrics, error) {
metricFamilies, err := s.doFetch(fetch)
m := pmetric.NewMetrics()
if err != nil {
return m, err
}

u, err := url.Parse(s.cfg.Endpoint)
if err != nil {
return m, err
}
rm := m.ResourceMetrics().AppendEmpty()
res := rm.Resource()
res.Attributes().PutStr(conventions.AttributeServiceName, s.name)
res.Attributes().PutStr(conventions.AttributeNetHostName, u.Host)
res.Attributes().PutStr(conventions.AttributeServiceInstanceID, s.name)
res.Attributes().PutStr(conventions.AttributeNetHostPort, u.Port())
res.Attributes().PutStr(conventions.AttributeHTTPScheme, u.Scheme)
s.convertMetricFamilies(metricFamilies, rm)
return m, nil
}

func (s *scraper) doFetch(fetch fetcher) ([]*dto.MetricFamily, error) {
body, expformat, err := fetch()
if err != nil {
return nil, err
}
defer body.Close()
var decoder expfmt.Decoder
// some "text" responses are missing \n from the last line
if expformat != expfmt.FmtProtoDelim {
decoder = expfmt.NewDecoder(io.MultiReader(body, strings.NewReader("\n")), expformat)
} else {
decoder = expfmt.NewDecoder(body, expformat)
}

var mfs []*dto.MetricFamily

for {
var mf dto.MetricFamily
err := decoder.Decode(&mf)

if err == io.EOF {
return mfs, nil
} else if err != nil {
return nil, err
}

mfs = append(mfs, &mf)
}
}

func (s *scraper) convertMetricFamilies(metricFamilies []*dto.MetricFamily, rm pmetric.ResourceMetrics) {
now := pcommon.NewTimestampFromTime(time.Now())

sm := rm.ScopeMetrics().AppendEmpty()
for _, family := range metricFamilies {
newMetric := sm.Metrics().AppendEmpty()
newMetric.SetName(family.GetName())
newMetric.SetDescription(family.GetHelp())
switch *family.Type {
case dto.MetricType_COUNTER:
sum := newMetric.SetEmptySum()
sum.SetIsMonotonic(true)
sum.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
for _, fm := range family.GetMetric() {
dp := sum.DataPoints().AppendEmpty()
dp.SetTimestamp(now)
dp.SetDoubleValue(fm.GetCounter().GetValue())
for _, l := range fm.GetLabel() {
if l.GetValue() != "" {
dp.Attributes().PutStr(l.GetName(), l.GetValue())
}
}
}
case dto.MetricType_GAUGE:
gauge := newMetric.SetEmptyGauge()
for _, fm := range family.Metric {
dp := gauge.DataPoints().AppendEmpty()
dp.SetDoubleValue(fm.GetGauge().GetValue())
dp.SetTimestamp(now)
for _, l := range fm.GetLabel() {
if l.GetValue() != "" {
dp.Attributes().PutStr(l.GetName(), l.GetValue())
}
}
}
case dto.MetricType_HISTOGRAM, dto.MetricType_GAUGE_HISTOGRAM:
histogram := newMetric.SetEmptyHistogram()
for _, fm := range family.Metric {
dp := histogram.DataPoints().AppendEmpty()
dp.SetTimestamp(now)
for _, b := range fm.GetHistogram().GetBucket() {
dp.BucketCounts().Append(b.GetCumulativeCount())
dp.ExplicitBounds().Append(b.GetUpperBound())
}
dp.SetSum(fm.GetHistogram().GetSampleSum())
dp.SetCount(fm.GetHistogram().GetSampleCount())
for _, l := range fm.GetLabel() {
if l.GetValue() != "" {
dp.Attributes().PutStr(l.GetName(), l.GetValue())
}
}
}
case dto.MetricType_SUMMARY:
sum := newMetric.SetEmptySummary()
for _, fm := range family.Metric {
dp := sum.DataPoints().AppendEmpty()
dp.SetTimestamp(now)
for _, q := range fm.GetSummary().GetQuantile() {
newQ := dp.QuantileValues().AppendEmpty()
newQ.SetValue(q.GetValue())
newQ.SetQuantile(q.GetQuantile())
}
dp.SetSum(fm.GetSummary().GetSampleSum())
dp.SetCount(fm.GetSummary().GetSampleCount())
for _, l := range fm.GetLabel() {
if l.GetValue() != "" {
dp.Attributes().PutStr(l.GetName(), l.GetValue())
}
}
}
case dto.MetricType_UNTYPED:
gauge := newMetric.SetEmptyGauge()
for _, fm := range family.Metric {
dp := gauge.DataPoints().AppendEmpty()
dp.SetDoubleValue(fm.GetUntyped().GetValue())
dp.SetTimestamp(now)
for _, l := range fm.GetLabel() {
if l.GetValue() != "" {
dp.Attributes().PutStr(l.GetName(), l.GetValue())
}
}
}
default:
s.settings.Logger.Warn("Unknown metric family", zap.Any("family", family.Type))
}
}
}
Loading

0 comments on commit 790e5dc

Please sign in to comment.