Skip to content

Commit

Permalink
Add scrape configs endpoint to target allocator (open-telemetry#1124)
Browse files Browse the repository at this point in the history
* Add scrape configs endpoint

* Fix a borked chan

* Fix some defaults

* comments and cleaning

* Goimports

* Pass linting

* Reuse and docs

* Update pkg/collector/container.go

Co-authored-by: Anthony Mirabella <[email protected]>

Co-authored-by: Pavol Loffay <[email protected]>
Co-authored-by: Anthony Mirabella <[email protected]>
  • Loading branch information
3 people authored Oct 4, 2022
1 parent fcb29fb commit 47b5015
Show file tree
Hide file tree
Showing 9 changed files with 117 additions and 13 deletions.
2 changes: 1 addition & 1 deletion apis/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 35 additions & 0 deletions cmd/otel-allocator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,42 @@ This configuration will be resolved to target configurations and then split acro
TargetAllocators expose the results as [HTTP_SD endpoints](https://prometheus.io/docs/prometheus/latest/http_sd/)
split by collector.

Currently, the Target Allocator handles the sharding of targets. The operator sets the `$SHARD` variable to 0 to allow
collectors to keep targets generated by a Prometheus CRD. Using Prometheus sharding and target allocator sharding is not
recommended currently and may lead to unknown results.
[See this thread for more information](https://github.com/open-telemetry/opentelemetry-operator/pull/1124#discussion_r984683577)

#### Endpoints
`/scrape_configs`:

```json
{
"job1": {
"follow_redirects": true,
"honor_timestamps": true,
"job_name": "job1",
"metric_relabel_configs": [],
"metrics_path": "/metrics",
"scheme": "http",
"scrape_interval": "1m",
"scrape_timeout": "10s",
"static_configs": []
},
"job2": {
"follow_redirects": true,
"honor_timestamps": true,
"job_name": "job2",
"metric_relabel_configs": [],
"metrics_path": "/metrics",
"relabel_configs": [],
"scheme": "http",
"scrape_interval": "1m",
"scrape_timeout": "10s",
"kubernetes_sd_configs": []
}
}
```

`/jobs`:

```json
Expand Down
10 changes: 10 additions & 0 deletions cmd/otel-allocator/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,16 @@ func NewManager(log logr.Logger, ctx context.Context, logger log.Logger, options
}
}

func (m *Manager) GetScrapeConfigs() map[string]*config.ScrapeConfig {
jobToScrapeConfig := map[string]*config.ScrapeConfig{}
for _, c := range m.configsMap {
for _, scrapeConfig := range c.ScrapeConfigs {
jobToScrapeConfig[scrapeConfig.JobName] = scrapeConfig
}
}
return jobToScrapeConfig
}

func (m *Manager) ApplyConfig(source allocatorWatcher.EventSource, cfg *config.Config) error {
m.configsMap[source] = cfg

Expand Down
8 changes: 4 additions & 4 deletions cmd/otel-allocator/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@ module github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator
go 1.19

require (
github.com/buraksezer/consistent v0.9.0
github.com/cespare/xxhash/v2 v2.1.2
github.com/fsnotify/fsnotify v1.5.1
github.com/ghodss/yaml v1.0.0
github.com/go-kit/log v0.2.0
github.com/go-logr/logr v1.2.0
github.com/gorilla/mux v1.8.0
github.com/prometheus-operator/prometheus-operator v0.53.1
github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring v0.53.1
github.com/prometheus-operator/prometheus-operator/pkg/client v0.53.1
github.com/prometheus/client_golang v1.11.0
github.com/prometheus/common v0.32.1
github.com/prometheus/prometheus v1.8.2-0.20211214150951-52c693a63be1
github.com/spf13/pflag v1.0.5
Expand Down Expand Up @@ -42,8 +46,6 @@ require (
github.com/aws/aws-sdk-go v1.44.41 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/blang/semver/v4 v4.0.0 // indirect
github.com/buraksezer/consistent v0.9.0 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe // indirect
github.com/containerd/containerd v1.5.7 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
Expand All @@ -59,7 +61,6 @@ require (
github.com/envoyproxy/protoc-gen-validate v0.6.2 // indirect
github.com/evanphx/json-patch v4.12.0+incompatible // indirect
github.com/fatih/color v1.12.0 // indirect
github.com/ghodss/yaml v1.0.0 // indirect
github.com/go-logfmt/logfmt v0.5.1 // indirect
github.com/go-logr/zapr v1.2.0 // indirect
github.com/go-openapi/analysis v0.20.0 // indirect
Expand Down Expand Up @@ -123,7 +124,6 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus-community/prom-label-proxy v0.4.1-0.20211215142838-1eac0933d512 // indirect
github.com/prometheus/alertmanager v0.23.1-0.20210914172521-e35efbddb66a // indirect
github.com/prometheus/client_golang v1.11.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common/sigv4 v0.1.0 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
Expand Down
41 changes: 33 additions & 8 deletions cmd/otel-allocator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ import (
"syscall"
"time"

"github.com/ghodss/yaml"
gokitlog "github.com/go-kit/log"
"github.com/go-logr/logr"
"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
yaml2 "gopkg.in/yaml.v2"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
ctrl "sigs.k8s.io/controller-runtime"

Expand Down Expand Up @@ -166,6 +168,7 @@ func newServer(log logr.Logger, allocator allocation.Allocator, discoveryManager
}
router := mux.NewRouter().UseEncodedPath()
router.Use(s.PrometheusMiddleware)
router.HandleFunc("/scrape_configs", s.ScrapeConfigsHandler).Methods("GET")
router.HandleFunc("/jobs", s.JobHandler).Methods("GET")
router.HandleFunc("/jobs/{job_id}/targets", s.TargetsHandler).Methods("GET")
router.Path("/metrics").Handler(promhttp.Handler())
Expand Down Expand Up @@ -204,12 +207,33 @@ func (s *server) Shutdown(ctx context.Context) error {
return s.server.Shutdown(ctx)
}

// ScrapeConfigsHandler returns the available scrape configuration discovered by the target allocator.
// The target allocator first marshals these configurations such that the underlying prometheus marshaling is used.
// After that, the YAML is converted in to a JSON format for consumers to use.
func (s *server) ScrapeConfigsHandler(w http.ResponseWriter, r *http.Request) {
configs := s.discoveryManager.GetScrapeConfigs()
configBytes, err := yaml2.Marshal(configs)
if err != nil {
s.errorHandler(w, err)
}
jsonConfig, err := yaml.YAMLToJSON(configBytes)
if err != nil {
s.errorHandler(w, err)
}
// We don't use the jsonHandler method because we don't want our bytes to be re-encoded
w.Header().Set("Content-Type", "application/json")
_, err = w.Write(jsonConfig)
if err != nil {
s.errorHandler(w, err)
}
}

func (s *server) JobHandler(w http.ResponseWriter, r *http.Request) {
displayData := make(map[string]allocation.LinkJSON)
for _, v := range s.allocator.TargetItems() {
displayData[v.JobName] = allocation.LinkJSON{Link: v.Link.Link}
}
jsonHandler(s.logger, w, displayData)
s.jsonHandler(w, displayData)
}

// PrometheusMiddleware implements mux.MiddlewareFunc.
Expand All @@ -233,33 +257,34 @@ func (s *server) TargetsHandler(w http.ResponseWriter, r *http.Request) {
params := mux.Vars(r)
jobId, err := url.QueryUnescape(params["job_id"])
if err != nil {
errorHandler(w)
s.errorHandler(w, err)
return
}

if len(q) == 0 {
displayData := allocation.GetAllTargetsByJob(jobId, compareMap, s.allocator)
jsonHandler(s.logger, w, displayData)
s.jsonHandler(w, displayData)

} else {
tgs := allocation.GetAllTargetsByCollectorAndJob(q[0], jobId, compareMap, s.allocator)
// Displays empty list if nothing matches
if len(tgs) == 0 {
jsonHandler(s.logger, w, []interface{}{})
s.jsonHandler(w, []interface{}{})
return
}
jsonHandler(s.logger, w, tgs)
s.jsonHandler(w, tgs)
}
}

func errorHandler(w http.ResponseWriter) {
func (s *server) errorHandler(w http.ResponseWriter, err error) {
w.WriteHeader(500)
s.jsonHandler(w, err)
}

func jsonHandler(logger logr.Logger, w http.ResponseWriter, data interface{}) {
func (s *server) jsonHandler(w http.ResponseWriter, data interface{}) {
w.Header().Set("Content-Type", "application/json")
err := json.NewEncoder(w).Encode(data)
if err != nil {
logger.Error(err, "failed to encode data for http response")
s.logger.Error(err, "failed to encode data for http response")
}
}
5 changes: 5 additions & 0 deletions cmd/otel-allocator/watcher/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
type FileWatcher struct {
configFilePath string
watcher *fsnotify.Watcher
closer chan bool
}

func newConfigMapWatcher(logger logr.Logger, config config.CLIConfig) (FileWatcher, error) {
Expand All @@ -38,6 +39,7 @@ func newConfigMapWatcher(logger logr.Logger, config config.CLIConfig) (FileWatch
return FileWatcher{
configFilePath: *config.ConfigFilePath,
watcher: fileWatcher,
closer: make(chan bool),
}, nil
}

Expand All @@ -51,6 +53,8 @@ func (f *FileWatcher) Start(upstreamEvents chan Event, upstreamErrors chan error
go func() {
for {
select {
case <-f.closer:
return
case fileEvent := <-f.watcher.Events:
if fileEvent.Op == fsnotify.Create {
upstreamEvents <- Event{
Expand All @@ -66,5 +70,6 @@ func (f *FileWatcher) Start(upstreamEvents chan Event, upstreamErrors chan error
}

func (f *FileWatcher) Close() error {
f.closer <- true
return f.watcher.Close()
}
12 changes: 12 additions & 0 deletions pkg/collector/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,18 @@ func Container(cfg config.Config, logger logr.Logger, otelcol v1alpha1.OpenTelem
},
})

if otelcol.Spec.TargetAllocator.Enabled {
// We need to add a SHARD here so the collector is able to keep targets after the hashmod operation which is
// added by default by the Prometheus operator's config generator.
// All collector instances use SHARD == 0 as they only receive targets
// allocated to them and should not use the Prometheus hashmod-based
// allocation.
envVars = append(envVars, corev1.EnvVar{
Name: "SHARD",
Value: "0",
})
}

var livenessProbe *corev1.Probe
if config, err := adapters.ConfigFromString(otelcol.Spec.Config); err == nil {
if probe, err := adapters.ConfigToContainerProbe(config); err == nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/collector/reconcile/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ func expectedServices(ctx context.Context, params Params, expected []corev1.Serv
updated.ObjectMeta.Labels[k] = v
}
updated.Spec.Ports = desired.Spec.Ports
updated.Spec.Selector = desired.Spec.Selector

patch := client.MergeFrom(existing)

Expand Down
16 changes: 16 additions & 0 deletions pkg/collector/reconcile/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,23 @@ func TestExpectedServices(t *testing.T) {
assert.True(t, exists)
assert.Equal(t, instanceUID, actual.OwnerReferences[0].UID)
assert.Contains(t, actual.Spec.Ports, extraPorts)
})
t.Run("should update service on version change", func(t *testing.T) {
serviceInstance := service("test-collector", params().Instance.Spec.Ports)
createObjectIfNotExists(t, "test-collector", &serviceInstance)

newService := service("test-collector", params().Instance.Spec.Ports)
newService.Spec.Selector["app.kubernetes.io/version"] = "Newest"
err := expectedServices(context.Background(), params(), []v1.Service{newService})
assert.NoError(t, err)

actual := v1.Service{}
exists, err := populateObjectIfExists(t, &actual, types.NamespacedName{Namespace: "default", Name: "test-collector"})

assert.NoError(t, err)
assert.True(t, exists)
assert.Equal(t, instanceUID, actual.OwnerReferences[0].UID)
assert.Equal(t, "Newest", actual.Spec.Selector["app.kubernetes.io/version"])
})
}

Expand Down

0 comments on commit 47b5015

Please sign in to comment.