Skip to content

Commit

Permalink
Multi targets metrics support (#73)
Browse files Browse the repository at this point in the history
dbulashev authored Jan 21, 2025
1 parent e6a1039 commit ba9fea6
Showing 8 changed files with 129 additions and 28 deletions.
24 changes: 13 additions & 11 deletions internal/http/http_server.go
Original file line number Diff line number Diff line change
@@ -7,7 +7,6 @@ import (
"time"

"github.com/cherts/pgscv/internal/log"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

// AuthConfig defines configuration settings for authentication.
@@ -56,16 +55,19 @@ type Server struct {
}

// NewServer creates new HTTP server instance.
func NewServer(cfg ServerConfig) *Server {
func NewServer(cfg ServerConfig,
handlerMetrics func(http.ResponseWriter, *http.Request),
targetsMetrics func(http.ResponseWriter, *http.Request),
) *Server {
mux := http.NewServeMux()

mux.Handle("/", handleRoot())

mux.HandleFunc("/", handleRoot())
if cfg.EnableAuth {
mux.Handle("/metrics", basicAuth(cfg.AuthConfig, promhttp.Handler()))
mux.HandleFunc("/metrics", basicAuth(cfg.AuthConfig, handlerMetrics))
} else {
mux.Handle("/metrics", promhttp.Handler())
mux.HandleFunc("/metrics", handlerMetrics)
}
mux.HandleFunc("/targets", targetsMetrics)

return &Server{
config: cfg,
@@ -91,26 +93,26 @@ func (s *Server) Serve() error {
}

// handleRoot defines handler for '/' endpoint.
func handleRoot() http.Handler {
func handleRoot() http.HandlerFunc {
const htmlTemplate = `<html>
<head><title>pgSCV / PostgreSQL metrics collector</title></head>
<body>
pgSCV / PostgreSQL metrics collector, for more info visit <a href="https://github.com/cherts/pgscv">Github</a> page.
<p><a href="/metrics">Metrics</a></p>
<p><a href="/metrics">Metrics</a> (add ?target=service_id, to get metrics for one service)</p>
<p><a href="/targets">Targets</a></p>
</body>
</html>
`

return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
return http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
_, err := w.Write([]byte(htmlTemplate))
if err != nil {
log.Warnln("response write failed: ", err)
}
})
}

// basicAuth is a middleware for basic authentication.
func basicAuth(cfg AuthConfig, next http.Handler) http.Handler {
func basicAuth(cfg AuthConfig, next http.HandlerFunc) http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
username, password, ok := r.BasicAuth()
if ok {
11 changes: 8 additions & 3 deletions internal/http/http_server_test.go
Original file line number Diff line number Diff line change
@@ -41,7 +41,7 @@ func TestAuthConfig_Validate(t *testing.T) {

func TestServer_Serve_HTTP(t *testing.T) {
addr := "127.0.0.1:17890"
srv := NewServer(ServerConfig{Addr: addr})
srv := NewServer(ServerConfig{Addr: addr}, getDummyHandler(), getDummyHandler())

var wg sync.WaitGroup
wg.Add(1)
@@ -63,13 +63,18 @@ func TestServer_Serve_HTTP(t *testing.T) {
}
}

func getDummyHandler() func(w http.ResponseWriter, r *http.Request) {
return func(http.ResponseWriter, *http.Request) {
}
}

func TestServer_Serve_HTTPS(t *testing.T) {
addr := "127.0.0.1:17891"
srv := NewServer(ServerConfig{Addr: addr, AuthConfig: AuthConfig{
EnableTLS: true,
Keyfile: "./testdata/example.key",
Certfile: "./testdata/example.crt",
}})
}}, getDummyHandler(), getDummyHandler())

var wg sync.WaitGroup
wg.Add(1)
@@ -129,7 +134,7 @@ func Test_basicAuth(t *testing.T) {
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
mux := http.NewServeMux()
mux.Handle("/", basicAuth(AuthConfig{Username: "user", Password: "pass"}, handleRoot()))
mux.HandleFunc("/", basicAuth(AuthConfig{Username: "user", Password: "pass"}, handleRoot()))

res := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodGet, "/", nil)
2 changes: 1 addition & 1 deletion internal/http/testing.go
Original file line number Diff line number Diff line change
@@ -11,7 +11,7 @@ import (

// TestServer create http test server
func TestServer(t *testing.T, code int, response string) *httptest.Server {
return httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
return httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, _ *http.Request) {
if code == http.StatusOK {
if response != "" {
_, err := fmt.Fprint(rw, response)
6 changes: 6 additions & 0 deletions internal/pgscv/config.go
Original file line number Diff line number Diff line change
@@ -41,6 +41,7 @@ type Config struct {
CollectTopIndex int `yaml:"collect_top_index"` // Limit elements on Indexes collector
CollectTopQuery int `yaml:"collect_top_query"` // Limit elements on Statements collector
SkipConnErrorMode bool `yaml:"skip_conn_error_mode"` // Skipping connection errors and creating a Service instance.
URLPrefix string `yaml:"url_prefix"` // Url prefix
}

// NewConfig creates new config based on config file or return default config if config file is not specified.
@@ -113,6 +114,9 @@ func NewConfig(configFilePath string) (*Config, error) {
if configFromEnv.SkipConnErrorMode {
configFromFile.SkipConnErrorMode = configFromEnv.SkipConnErrorMode
}
if configFromEnv.URLPrefix != "" {
configFromFile.URLPrefix = configFromEnv.URLPrefix
}
return configFromFile, nil
}

@@ -427,6 +431,8 @@ func newConfigFromEnv() (*Config, error) {
config.CollectTopIndex = collectTopIndex
case "PGSCV_SKIP_CONN_ERROR_MODE":
config.SkipConnErrorMode = toBool(value)
case "PGSCV_URL_PREFIX":
config.URLPrefix = value
}
}
return config, nil
78 changes: 72 additions & 6 deletions internal/pgscv/pgscv.go
Original file line number Diff line number Diff line change
@@ -3,12 +3,17 @@ package pgscv

import (
"context"
"encoding/json"
"errors"
"sync"

"fmt"
"github.com/cherts/pgscv/internal/http"
"github.com/cherts/pgscv/internal/log"
"github.com/cherts/pgscv/internal/service"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
net_http "net/http"
"strings"
"sync"
)

// Start is the application's starting point.
@@ -52,7 +57,7 @@ func Start(ctx context.Context, config *Config) error {
// Start HTTP metrics listener.
wg.Add(1)
go func() {
if err := runMetricsListener(ctx, config); err != nil {
if err := runMetricsListener(ctx, config, serviceRepo); err != nil {
errCh <- err
}
wg.Done()
@@ -74,12 +79,73 @@ func Start(ctx context.Context, config *Config) error {
}
}

// getMetricsHandler return http handler function to /metrics endpoint
func getMetricsHandler(repository *service.Repository) func(w net_http.ResponseWriter, r *net_http.Request) {
return func(w net_http.ResponseWriter, r *net_http.Request) {
target := r.URL.Query().Get("target")
if target == "" {
h := promhttp.InstrumentMetricHandler(
prometheus.DefaultRegisterer, promhttp.HandlerFor(prometheus.DefaultGatherer, promhttp.HandlerOpts{}),
)
h.ServeHTTP(w, r)
} else {
registry := repository.GetRegistry(target)
if registry == nil {
net_http.Error(w, fmt.Sprintf("Target %s not registered", target), http.StatusNotFound)
return
}
h := promhttp.InstrumentMetricHandler(
registry, promhttp.HandlerFor(registry, promhttp.HandlerOpts{}),
)
h.ServeHTTP(w, r)
}
}
}

// getTargetsHandler return http handler function to /targets endpoint
func getTargetsHandler(repository *service.Repository, urlPrefix string, enableTLS bool) func(w net_http.ResponseWriter, r *net_http.Request) {
return func(w net_http.ResponseWriter, r *net_http.Request) {
svcIDs := repository.GetServiceIDs()
targets := make([]string, len(svcIDs))
var url string
if urlPrefix != "" {
url = strings.Trim(urlPrefix, "/")
} else {
if enableTLS {
url = fmt.Sprintf("https://%s", r.Host)
} else {
url = r.Host
}
}
for i, svcID := range svcIDs {
targets[i] = fmt.Sprintf("%s/metrics?target=%s", url, svcID)
}
data := []struct {
Targets []string `json:"targets"`
}{
0: {Targets: targets},
}

jsonData, err := json.Marshal(data)
if err != nil {
net_http.Error(w, err.Error(), net_http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
_, err = w.Write(jsonData)
if err != nil {
log.Error(err.Error())
}
}
}

// runMetricsListener start HTTP listener accordingly to passed configuration.
func runMetricsListener(ctx context.Context, config *Config) error {
srv := http.NewServer(http.ServerConfig{
func runMetricsListener(ctx context.Context, config *Config, repository *service.Repository) error {
sCfg := http.ServerConfig{
Addr: config.ListenAddress,
AuthConfig: config.AuthConfig,
})
}
srv := http.NewServer(sCfg, getMetricsHandler(repository), getTargetsHandler(repository, config.URLPrefix, config.AuthConfig.EnableTLS))

errCh := make(chan error)
defer close(errCh)
2 changes: 1 addition & 1 deletion internal/pgscv/pgscv_test.go
Original file line number Diff line number Diff line change
@@ -41,7 +41,7 @@ func Test_runMetricsListener(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

err := runMetricsListener(ctx, config)
err := runMetricsListener(ctx, config, nil)
assert.NoError(t, err)
wg.Done()
}()
32 changes: 27 additions & 5 deletions internal/service/service.go
Original file line number Diff line number Diff line change
@@ -59,12 +59,14 @@ type Collector interface {
type Repository struct {
sync.RWMutex // protect concurrent access
Services map[string]Service // service repo store
Registries map[string]*prometheus.Registry
}

// NewRepository creates new services repository.
func NewRepository() *Repository {
return &Repository{
Services: make(map[string]Service),
Services: make(map[string]Service),
Registries: make(map[string]*prometheus.Registry),
}
}

@@ -89,6 +91,23 @@ func (repo *Repository) addService(s Service) {
repo.Unlock()
}

func (repo *Repository) addRegistry(serviceID string, r *prometheus.Registry) {
repo.Lock()
repo.Registries[serviceID] = r
repo.Unlock()
}

// GetRegistry returns registry with specified serviceID
func (repo *Repository) GetRegistry(serviceID string) *prometheus.Registry {
repo.RLock()
r, ok := repo.Registries[serviceID]
repo.RUnlock()
if !ok {
return nil
}
return r
}

// getService returns the service from repo with specified ID.
func (repo *Repository) getService(id string) Service {
repo.RLock()
@@ -105,8 +124,8 @@ func (repo *Repository) totalServices() int {
return size
}

// getServiceIDs returns slice of services' IDs in the repo.
func (repo *Repository) getServiceIDs() []string {
// GetServiceIDs returns slice of services' IDs in the repo.
func (repo *Repository) GetServiceIDs() []string {
var serviceIDs = make([]string, 0, repo.totalServices())
repo.RLock()
for i := range repo.Services {
@@ -141,7 +160,6 @@ func (repo *Repository) addServicesFromConfig(config Config) {
log.Warnf("%s: %s, skip", cs.BaseURL, err)
continue
}

msg = fmt.Sprintf("service [%s] available through: %s", k, cs.BaseURL)
} else {
// each ConnSetting struct is used for
@@ -188,7 +206,7 @@ func (repo *Repository) addServicesFromConfig(config Config) {
func (repo *Repository) setupServices(config Config) error {
log.Debug("config: setting up services")

for _, id := range repo.getServiceIDs() {
for _, id := range repo.GetServiceIDs() {
var service = repo.getService(id)
if service.Collector == nil {
factories := collector.Factories{}
@@ -228,6 +246,10 @@ func (repo *Repository) setupServices(config Config) error {

// Put updated service into repo.
repo.addService(service)
registry := prometheus.NewRegistry()
registry.MustRegister(service.Collector)
repo.addRegistry(service.ServiceID, registry)

log.Debugf("service configured [%s]", id)
}
}
2 changes: 1 addition & 1 deletion internal/service/service_test.go
Original file line number Diff line number Diff line change
@@ -38,7 +38,7 @@ func TestRepository_getServiceIDs(t *testing.T) {
r.addService(s2)
r.addService(s3)

ids := r.getServiceIDs()
ids := r.GetServiceIDs()
assert.Equal(t, 3, len(ids))

contains := func(ss []string, s string) bool {

0 comments on commit ba9fea6

Please sign in to comment.