Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds synchronous and asynchronous OpenTelemetry metrics #42

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,22 @@

# otelpgx

Provides [OpenTelemetry](https://github.com/open-telemetry/opentelemetry-go)
Provides [OpenTelemetry](https://github.com/open-telemetry/opentelemetry-go)
instrumentation for the [jackc/pgx](https://github.com/jackc/pgx) library.

## Requirements

- go 1.18 (or higher)
- go 1.22 (or higher)
- pgx v5 (or higher)

## Usage

Make sure you have a suitable pgx version:

```bash
go get github.com/jackc/pgx/v5
```

Install the library:

```go
Expand All @@ -28,10 +34,14 @@ if err != nil {

cfg.ConnConfig.Tracer = otelpgx.NewTracer()

conn, err := pgxpool.NewWithConfig(ctx, cfg)
conn, err := pgxpool.NewConfig(ctx, cfg)
if err != nil {
return nil, fmt.Errorf("connect to database: %w", err)
}

if err := otelpgx.RecordStats(conn); err != nil {
return nil, fmt.Errorf("unable to record database stats: %w", err)
}
```

See [options.go](options.go) for the full list of options.
See [options.go](options.go) for the full list of options.
12 changes: 7 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
module github.com/exaring/otelpgx

go 1.20
go 1.22

toolchain go1.23.0

require (
github.com/jackc/pgx/v5 v5.6.0
go.opentelemetry.io/otel v1.23.1
go.opentelemetry.io/otel/trace v1.23.1
go.opentelemetry.io/otel v1.32.0
go.opentelemetry.io/otel/metric v1.32.0
go.opentelemetry.io/otel/trace v1.32.0
)

require (
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
go.opentelemetry.io/otel/metric v1.23.1 // indirect
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/text v0.14.0 // indirect
Expand Down
22 changes: 13 additions & 9 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ=
github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=
Expand All @@ -19,13 +21,14 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
go.opentelemetry.io/otel v1.23.1 h1:Za4UzOqJYS+MUczKI320AtqZHZb7EqxO00jAHE0jmQY=
go.opentelemetry.io/otel v1.23.1/go.mod h1:Td0134eafDLcTS4y+zQ26GE8u3dEuRBiBCTUIRHaikA=
go.opentelemetry.io/otel/metric v1.23.1 h1:PQJmqJ9u2QaJLBOELl1cxIdPcpbwzbkjfEyelTl2rlo=
go.opentelemetry.io/otel/metric v1.23.1/go.mod h1:mpG2QPlAfnK8yNhNJAxDZruU9Y1/HubbC+KyH8FaCWI=
go.opentelemetry.io/otel/trace v1.23.1 h1:4LrmmEd8AU2rFvU1zegmvqW7+kWarxtNOPyeL6HmYY8=
go.opentelemetry.io/otel/trace v1.23.1/go.mod h1:4IpnpJFwr1mo/6HL8XIPJaE9y0+u1KcVmuW7dwFSVrI=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
go.opentelemetry.io/otel v1.32.0 h1:WnBN+Xjcteh0zdk01SVqV55d/m62NJLJdIyb4y/WO5U=
go.opentelemetry.io/otel v1.32.0/go.mod h1:00DCVSB0RQcnzlwyTfqtxSm+DRr9hpYrHjNGiBHVQIg=
go.opentelemetry.io/otel/metric v1.32.0 h1:xV2umtmNcThh2/a/aCP+h64Xx5wsj8qqnkYZktzNa0M=
go.opentelemetry.io/otel/metric v1.32.0/go.mod h1:jH7CIbbK6SH2V2wE16W05BHCtIDzauciCRLoc/SyMv8=
go.opentelemetry.io/otel/trace v1.32.0 h1:WIC9mYrXf8TmY/EXuULKc8hR17vE+Hjv2cssQDe03fM=
go.opentelemetry.io/otel/trace v1.32.0/go.mod h1:+i4rkvCraA+tG6AzwloGaCtkx53Fa+L+V8e9a7YvhT8=
golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k=
golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
Expand All @@ -35,3 +38,4 @@ golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
230 changes: 230 additions & 0 deletions meter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
package otelpgx

import (
"context"
"fmt"
"sync"
"time"

"github.com/jackc/pgx/v5/pgxpool"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
semconv "go.opentelemetry.io/otel/semconv/v1.27.0"
)

const (
// defaultMinimumReadDBStatsInterval is the default minimum interval between calls to db.Stats().
defaultMinimumReadDBStatsInterval = time.Second
)

var (
pgxPoolAcquireCount = "pgxpool.acquires"
pgxPoolAcquireDuration = "pgxpool.acquire_duration"
pgxPoolAcquiredConnections = "pgxpool.acquired_connections"
pgxPoolCancelledAcquires = "pgxpool.canceled_acquires"
pgxPoolConstructingConnections = "pgxpool.constructing_connections"
pgxPoolEmptyAcquire = "pgxpool.empty_acquire"
pgxPoolIdleConnections = "pgxpool.idle_connections"
pgxPoolMaxConnections = "pgxpool.max_connections"
pgxPoolMaxIdleDestroyCount = "pgxpool.max_idle_destroys"
pgxPoolMaxLifetimeDestroyCount = "pgxpool.max_lifetime_destroys"
pgxPoolNewConnectionsCount = "pgxpool.new_connections"
pgxPoolTotalConnections = "pgxpool.total_connections"
)

// RecordStats records database statistics for provided pgxpool.Pool at the provided interval.
func RecordStats(db *pgxpool.Pool, opts ...StatsOption) error {
o := statsOptions{
meterProvider: otel.GetMeterProvider(),
minimumReadDBStatsInterval: defaultMinimumReadDBStatsInterval,
defaultAttributes: []attribute.KeyValue{
semconv.DBSystemPostgreSQL,
},
}

for _, opt := range opts {
opt.applyStatsOptions(&o)
}

meter := o.meterProvider.Meter(meterName, metric.WithInstrumentationVersion(findOwnImportedVersion()))

return recordStats(meter, db, o.minimumReadDBStatsInterval, o.defaultAttributes...)
}

func recordStats(
meter metric.Meter,
db *pgxpool.Pool,
minimumReadDBStatsInterval time.Duration,
attrs ...attribute.KeyValue,
) error {
var (
err error

// Asynchronous Observable Metrics
acquireCount metric.Int64ObservableCounter
acquireDuration metric.Float64ObservableCounter
acquiredConns metric.Int64ObservableUpDownCounter
cancelledAcquires metric.Int64ObservableCounter
constructingConns metric.Int64ObservableUpDownCounter
emptyAcquires metric.Int64ObservableCounter
idleConns metric.Int64ObservableUpDownCounter
maxConns metric.Int64ObservableGauge
maxIdleDestroyCount metric.Int64ObservableCounter
maxLifetimeDestroyCount metric.Int64ObservableCounter
newConnsCount metric.Int64ObservableCounter
totalConns metric.Int64ObservableUpDownCounter

observeOptions []metric.ObserveOption
serverAddress = semconv.ServerAddress(db.Config().ConnConfig.Host)
serverPort = semconv.ServerPort(int(db.Config().ConnConfig.Port))
dbNamespace = semconv.DBNamespace(db.Config().ConnConfig.Database)
poolName = fmt.Sprintf("%s:%d/%s", serverAddress.Value.AsString(), serverPort.Value.AsInt64(), dbNamespace.Value.AsString())
dbClientConnectionPoolName = semconv.DBClientConnectionPoolName(poolName)

dbStats *pgxpool.Stat
lastDBStats time.Time

// lock prevents a race between batch observer and instrument registration.
lock sync.Mutex
)

lock.Lock()
defer lock.Unlock()

if acquireCount, err = meter.Int64ObservableCounter(
pgxPoolAcquireCount,
metric.WithDescription("Cumulative count of successful acquires from the pool."),
); err != nil {
return err
}

if acquireDuration, err = meter.Float64ObservableCounter(
pgxPoolAcquireDuration,
metric.WithDescription("Total duration of all successful acquires from the pool in nanoseconds."),
); err != nil {
return err
}

if acquiredConns, err = meter.Int64ObservableUpDownCounter(
pgxPoolAcquiredConnections,
metric.WithDescription("Number of currently acquired connections in the pool."),
); err != nil {
return err
}

if cancelledAcquires, err = meter.Int64ObservableCounter(
pgxPoolCancelledAcquires,
metric.WithDescription("Cumulative count of acquires from the pool that were canceled by a context."),
); err != nil {
return err
}

if constructingConns, err = meter.Int64ObservableUpDownCounter(
pgxPoolConstructingConnections,
metric.WithUnit("ms"),
metric.WithDescription("Number of connections with construction in progress in the pool."),
); err != nil {
return err
}

if emptyAcquires, err = meter.Int64ObservableCounter(
pgxPoolEmptyAcquire,
metric.WithDescription("Cumulative count of successful acquires from the pool that waited for a resource to be released or constructed because the pool was empty."),
); err != nil {
return err
}

if idleConns, err = meter.Int64ObservableUpDownCounter(
pgxPoolIdleConnections,
metric.WithDescription("Number of currently idle connections in the pool."),
); err != nil {
return err
}

if maxConns, err = meter.Int64ObservableGauge(
pgxPoolMaxConnections,
metric.WithDescription("Maximum size of the pool."),
); err != nil {
return err
}

if maxIdleDestroyCount, err = meter.Int64ObservableCounter(
pgxPoolMaxIdleDestroyCount,
metric.WithDescription("Cumulative count of connections destroyed because they exceeded MaxConnectionsIdleTime."),
); err != nil {
return err
}

if maxLifetimeDestroyCount, err = meter.Int64ObservableCounter(
pgxPoolMaxLifetimeDestroyCount,
metric.WithDescription("Cumulative count of connections destroyed because they exceeded MaxConnectionsLifetime."),
); err != nil {
return err
}

if newConnsCount, err = meter.Int64ObservableCounter(
pgxPoolNewConnectionsCount,
metric.WithDescription("Cumulative count of new connections opened."),
); err != nil {
return err
}

if totalConns, err = meter.Int64ObservableUpDownCounter(
pgxPoolTotalConnections,
metric.WithDescription("Total number of resources currently in the pool. The value is the sum of ConstructingConnections, AcquiredConnections, and IdleConnections."),
); err != nil {
return err
}

attrs = append(attrs, []attribute.KeyValue{
semconv.DBSystemPostgreSQL,
dbClientConnectionPoolName,
}...)

observeOptions = []metric.ObserveOption{
metric.WithAttributes(attrs...),
}

_, err = meter.RegisterCallback(
func(ctx context.Context, o metric.Observer) error {
lock.Lock()
defer lock.Unlock()

now := time.Now()
if now.Sub(lastDBStats) >= minimumReadDBStatsInterval {
dbStats = db.Stat()
lastDBStats = now
}

o.ObserveInt64(acquireCount, dbStats.AcquireCount(), observeOptions...)
o.ObserveFloat64(acquireDuration, float64(dbStats.AcquireDuration())/1e6, observeOptions...)
o.ObserveInt64(acquiredConns, int64(dbStats.AcquiredConns()), observeOptions...)
o.ObserveInt64(cancelledAcquires, dbStats.CanceledAcquireCount(), observeOptions...)
o.ObserveInt64(constructingConns, int64(dbStats.ConstructingConns()), observeOptions...)
o.ObserveInt64(emptyAcquires, dbStats.EmptyAcquireCount(), observeOptions...)
o.ObserveInt64(idleConns, int64(dbStats.IdleConns()), observeOptions...)
o.ObserveInt64(maxConns, int64(dbStats.MaxConns()), observeOptions...)
o.ObserveInt64(maxIdleDestroyCount, dbStats.MaxIdleDestroyCount(), observeOptions...)
o.ObserveInt64(maxLifetimeDestroyCount, dbStats.MaxLifetimeDestroyCount(), observeOptions...)
o.ObserveInt64(newConnsCount, dbStats.NewConnsCount(), observeOptions...)
o.ObserveInt64(totalConns, int64(dbStats.TotalConns()), observeOptions...)

return nil
},
acquireCount,
acquireDuration,
acquiredConns,
cancelledAcquires,
constructingConns,
emptyAcquires,
idleConns,
maxConns,
maxIdleDestroyCount,
maxLifetimeDestroyCount,
newConnsCount,
totalConns,
)

return err
}
Loading