Skip to content

Commit

Permalink
Foundations for supporting metrics telemetry via OpenTelemetry
Browse files Browse the repository at this point in the history
Provides a basic implementation for tcp stream latency and throughput collection. Also provides a telemtry folder with an example end to end setup.
  • Loading branch information
doodlesbykumbi committed May 6, 2021
1 parent 134124d commit 1de7c61
Show file tree
Hide file tree
Showing 9 changed files with 580 additions and 10 deletions.
15 changes: 8 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/cyberark/secretless-broker
require (
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 // indirect
github.com/Microsoft/go-winio v0.4.16 // indirect
github.com/aws/aws-sdk-go v1.15.79
github.com/aws/aws-sdk-go v1.27.0
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/codegangsta/cli v1.20.0
github.com/containerd/containerd v1.3.2 // indirect
Expand All @@ -25,21 +25,22 @@ require (
github.com/heptiolabs/healthcheck v0.0.0-20180807145615-6ff867650f40
github.com/imdario/mergo v0.3.8 // indirect
github.com/joho/godotenv v1.2.0
github.com/json-iterator/go v1.1.8 // indirect
github.com/keybase/go-keychain v0.0.0-20201121013009-976c83ec27a6
github.com/lib/pq v0.0.0-20180123210206-19c8e9ad0095
github.com/morikuni/aec v1.0.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.1 // indirect
github.com/pkg/errors v0.9.1
github.com/pkg/profile v1.2.1
github.com/prometheus/client_golang v1.2.1 // indirect
github.com/smartystreets/goconvey v0.0.0-20190731233626-505e41936337
github.com/smartystreets/goconvey v1.6.4
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/testify v1.5.1
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59
github.com/stretchr/testify v1.7.0
go.opentelemetry.io/otel v0.20.0
go.opentelemetry.io/otel/exporters/metric/prometheus v0.20.0
go.opentelemetry.io/otel/metric v0.20.0
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9
golang.org/x/text v0.3.3 // indirect
gopkg.in/yaml.v2 v2.2.2
gopkg.in/yaml.v2 v2.3.0
gotest.tools v2.2.0+incompatible // indirect
k8s.io/api v0.0.0-20180712090710-2d6f90ab1293
k8s.io/apiextensions-apiserver v0.0.0-20180808065829-408db4a50408
Expand Down
279 changes: 279 additions & 0 deletions go.sum

Large diffs are not rendered by default.

81 changes: 80 additions & 1 deletion internal/plugin/connectors/tcp/proxy_service.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package tcp

import (
"context"
"fmt"
"io"
"net"
"time"

validation "github.com/go-ozzo/ozzo-validation"
"github.com/pkg/errors"
"go.opentelemetry.io/otel/metric"

"github.com/cyberark/secretless-broker/internal"
"github.com/cyberark/secretless-broker/pkg/secretless/log"
Expand Down Expand Up @@ -45,6 +48,47 @@ type proxyService struct {
listener net.Listener
logger log.Logger
retrieveCredentials internal.CredentialsRetriever
throughputCounter metric.BoundInt64Counter
latencyRecorder metric.BoundInt64ValueRecorder
}


type ReadWriteNotifier struct {
readWriter io.ReadWriter
onWrite func(bytesWritten int, timeToHandoff time.Duration)
onRead func(bytesRead int, timeSpentBlocking time.Duration)
}

// Write implements the io.ReadWriter interface.
func (rwn *ReadWriteNotifier) Write(buffer []byte) (int, error) {
start := time.Now()
n, err := rwn.readWriter.Write(buffer)
if err == nil && rwn.onWrite != nil {
rwn.onWrite(n, time.Now().Sub(start))
}

return n, err
}

// Read implements the io.ReadWriter interface.
func (rwn *ReadWriteNotifier) Read(buffer []byte) (int, error) {
start := time.Now()
n, err := rwn.readWriter.Read(buffer)
if err == nil && rwn.onRead != nil {
rwn.onRead(n, time.Now().Sub(start))
}

return n, err
}

func AddTelemetry(
svc internal.Service,
throughputCounter metric.BoundInt64Counter,
latencyRecorder metric.BoundInt64ValueRecorder,
) {
_svc := svc.(*proxyService)
_svc.throughputCounter = throughputCounter
_svc.latencyRecorder = latencyRecorder
}

// NewProxyService constructs a new instance of a TCP ProxyService. The
Expand Down Expand Up @@ -123,7 +167,42 @@ func (proxy *proxyService) handleConnection(clientConn net.Conn) error {

logger.Debugf("Proxying connection on %v to %v.\n", clientConn.LocalAddr(), targetConn.RemoteAddr())

clientErrChan, destErrChan := duplexStream(clientConn, targetConn)
var lastTargetRead time.Time
var lastClientRead time.Time


// TODO: concurrency protections
clientErrChan, destErrChan := duplexStream(
&ReadWriteNotifier{
readWriter: clientConn,
onWrite: func(bytesWritten int, timeToHandoff time.Duration) {
// clientWrite
streamLatency := time.Now().Sub(lastTargetRead)

ctx := context.Background()
proxy.throughputCounter.Add(ctx, int64(bytesWritten))
proxy.latencyRecorder.Record(ctx, streamLatency.Microseconds())
},
onRead: func(bytesRead int, timeSpentBlocking time.Duration) {
// clientRead
lastClientRead = time.Now()
},
}, &ReadWriteNotifier{
readWriter: targetConn,
onWrite: func(bytesWritten int, timeToHandoff time.Duration) {
// targetWrite
streamLatency := time.Now().Sub(lastClientRead)

ctx := context.Background()
proxy.throughputCounter.Add(ctx, int64(bytesWritten))
proxy.latencyRecorder.Record(ctx, streamLatency.Microseconds())
},
onRead: func(bytesRead int, timeSpentBlocking time.Duration) {
// targetRead
lastTargetRead = time.Now()
},
},
)

var closer string
select {
Expand Down
24 changes: 23 additions & 1 deletion internal/proxyservice/proxy_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
"strings"

"github.com/go-ozzo/ozzo-validation"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/unit"

"github.com/cyberark/secretless-broker/internal"
"github.com/cyberark/secretless-broker/internal/plugin"
Expand All @@ -33,6 +36,7 @@ type proxyServices struct {
logger logapi.Logger
resolver v1.Resolver
runningServices []internal.Service
meter metric.Meter
}

// Start starts all proxy services
Expand Down Expand Up @@ -303,12 +307,28 @@ func (s *proxyServices) createTCPService(
connResources.Logger(),
credsRetriever,
)

if err != nil {
s.logger.Errorf("could not create proxy service '%s'", config.Name)
return nil, err
}

meter := metric.Must(s.meter)
labels := []attribute.KeyValue{
attribute.String("service.name", config.Connector + ":" + "secretless"),
attribute.String("secretless.service_name", config.Name),
attribute.String("secretless.connector_name", config.Connector),
}
throughputCounter := meter.NewInt64Counter(
"secretless.tcp.stream.bytes",
metric.WithUnit(unit.Bytes),
).Bind(labels...)

latencyRecorder := meter.NewInt64ValueRecorder(
"secretless.tcp.stream.latency",
).Bind(labels...)

tcpproxy.AddTelemetry(newSvc, throughputCounter, latencyRecorder)

return newSvc, nil
}

Expand Down Expand Up @@ -337,6 +357,7 @@ func NewProxyServices(
availPlugins plugin2.AvailablePlugins,
logger logapi.Logger,
evtNotifier v1.EventNotifier,
meter metric.Meter,
) internal.Service {

// Setup our resolver
Expand All @@ -355,6 +376,7 @@ func NewProxyServices(
eventNotifier: evtNotifier,
logger: logger,
resolver: resolver,
meter: meter,
}

// TODO: v2.NewConfigsByType should be an interface, so we can remove this
Expand Down
36 changes: 35 additions & 1 deletion pkg/secretless/entrypoint/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@ package entrypoint
import (
"fmt"
"log"
"net/http"
"os"
"strings"

"go.opentelemetry.io/otel/exporters/metric/prometheus"
"go.opentelemetry.io/otel/metric/global"

"github.com/cyberark/secretless-broker/internal"
"github.com/cyberark/secretless-broker/internal/configurationmanagers/configfile"
"github.com/cyberark/secretless-broker/internal/configurationmanagers/kubernetes/crd"
Expand Down Expand Up @@ -33,6 +37,27 @@ type SecretlessOptions struct {
ShowVersion bool
}

func initMeter() {
exporter, err := prometheus.InstallNewPipeline(prometheus.Config{
DefaultHistogramBoundaries: []float64{
50, 100, 500, 1000, 5000, 10000, 50000, 1000000,
},
})
if err != nil {
log.Panicf("failed to initialize prometheus exporter %v", err)
}
http.HandleFunc("/metrics", exporter.ServeHTTP)
go func() {
err := http.ListenAndServe(":2222", nil)
if err != nil {
panic(err)
}
}()

log.Println("Prometheus server running on :2222")
}


// StartSecretless method is the main entry point into the broker after the CLI
// flags have been parsed
func StartSecretless(params *SecretlessOptions) {
Expand All @@ -41,6 +66,9 @@ func StartSecretless(params *SecretlessOptions) {
return
}

initMeter()
meter := global.Meter("secretless-broker")

log.Printf("Secretless v%s starting up...", secretless.FullVersionName)

// Health check
Expand Down Expand Up @@ -95,7 +123,13 @@ func StartSecretless(params *SecretlessOptions) {
}

// Start Services
allServices = proxyservice.NewProxyServices(cfg, availPlugins, logger, evtNotifier)
allServices = proxyservice.NewProxyServices(
cfg,
availPlugins,
logger,
evtNotifier,
meter,
)
err = allServices.Start()
if err != nil {
log.Fatalf("Failed to start services: %s", err)
Expand Down
102 changes: 102 additions & 0 deletions telemetry/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# Telemetry

[OpenTelemetry](https://opentelemetry.io/docs/go/getting-started/) is being
used to add telemetry to Secretless. For now the scope is to only introduce
metrics for TCP streaming.

The current implementation for metric collection is as follows
1. Define a global meter in the Secretless entrypoint
1. Use Prometheus pull exporter, and expose a metrics endpoint. Ensure that the Prometheus exported has reasonable DefaultHistogramBoundaries. TODO: Find out if they can be set per metric, it seems strange that this is set globally.
1. Create counter for throughput, and recorder for latency. Ensure the metrics are
labelled with service specific metadata (connector type, service name etc.).
+ `secretless_tcp_stream_throughput`
+ `secretless_tcp_stream_latency`

For making measurements we start by nothing that network I/O in Go is blocking. That
means reads will block a goroutine until the buffer has something. We use
`io.Copy` to implement unidirectional streaming, it takes as input a destination `io.Writer` and
a source `io.Reader`. TCP streaming in Secretless results from 2 Go routines running `io.Copy`
taking as input the client and target TCP connections, each alternating at being source and destination in the 2 Go routines. `io.Copy` handles all the reading,
writing and buffering. The `io.Copy` for each direction blocks in its goroutine
for the lifetime of the streaming.

In order to take latency and throughput measurement, we must
1. Instrument each TCP connection instance by wrapping it to intercept the start and finish
of reads and writes.
1. Measure latency as the time between when a TCP connection read unblocks, and a write returns. This applies equally to incoming and outgoing streaming, using the client as the datum

## Pending questions

- [ ] What is the impact of Telemetry, if any ?
- [ ] What is a good UX for toggling Telemetry on and off ?
- [ ] What are the pros and cons of push vs pull metric collection, and how does it impact the data available at analysis time.
- [ ] At present the implementation relies on a Prometheus pull metrics endpoint. What are the configuration options (e.g. polling interval) available and what impact do they have to the data that is available at analysis time.

## Setup

1. Run target service (e.g. postgres on 0.0.0.0:5432)
1. Update [secretless.yml](./secretless.yml) to point to target service
1. Run telemetry infrastructure (Prometheus and Grafana), `docker-compose up -d`
1. Login in to grafana. Credentials are `admin` and `admin`
1. Add prometheus datasource to grafana, URL is `host.docker.internal:9090`
1. Create dashboards using examples in [analysis](#analysis)

## Analysis

Get average latency:
```
rate(secretless_tcp_stream_latency_sum[5m])/rate(secretless_tcp_stream_latency_count[5m])
```

Average latency:
```yaml
Metrics: rate(secretless_tcp_stream_latency_sum[5m])/rate(secretless_tcp_stream_latency_count[5m])
Legend:
```
Latency buckets:
```yaml
Metrics: secretless_tcp_stream_latency_bucket
Legend: {{le}}
```
Latency Histogram:
```yaml
Metrics: histogram_quantile(0.99, sum(rate(secretless_tcp_stream_latency_bucket[5m])) by (le))
Legend: {{le}}
```
Example snapshot from metrics endpoint:
```yaml
➜ ~ curl -X POST -v localhost:2222/metrics
* Trying 127.0.0.1...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 2222 (#0)
> POST /metrics HTTP/1.1
> Host: localhost:2222
> User-Agent: curl/7.54.0
> Accept: */*
>
< HTTP/1.1 200 OK
< Content-Type: text/plain; version=0.0.4; charset=utf-8
< Date: Thu, 06 May 2021 11:06:06 GMT
< Transfer-Encoding: chunked
<
# HELP secretless_tcp_stream_bytes
# TYPE secretless_tcp_stream_bytes counter
secretless_tcp_stream_bytes{secretless_connector_name="pg",secretless_service_name="pg_service",service_name="pg:secretless",telemetry_sdk_language="go",telemetry_sdk_name="opentelemetry",telemetry_sdk_version="0.20.0"} 383704
# HELP secretless_tcp_stream_latency
# TYPE secretless_tcp_stream_latency histogram
secretless_tcp_stream_latency_bucket{secretless_connector_name="pg",secretless_service_name="pg_service",service_name="pg:secretless",telemetry_sdk_language="go",telemetry_sdk_name="opentelemetry",telemetry_sdk_version="0.20.0",le="50"} 8043
secretless_tcp_stream_latency_bucket{secretless_connector_name="pg",secretless_service_name="pg_service",service_name="pg:secretless",telemetry_sdk_language="go",telemetry_sdk_name="opentelemetry",telemetry_sdk_version="0.20.0",le="100"} 8706
secretless_tcp_stream_latency_bucket{secretless_connector_name="pg",secretless_service_name="pg_service",service_name="pg:secretless",telemetry_sdk_language="go",telemetry_sdk_name="opentelemetry",telemetry_sdk_version="0.20.0",le="500"} 8794
secretless_tcp_stream_latency_bucket{secretless_connector_name="pg",secretless_service_name="pg_service",service_name="pg:secretless",telemetry_sdk_language="go",telemetry_sdk_name="opentelemetry",telemetry_sdk_version="0.20.0",le="1000"} 8794
secretless_tcp_stream_latency_bucket{secretless_connector_name="pg",secretless_service_name="pg_service",service_name="pg:secretless",telemetry_sdk_language="go",telemetry_sdk_name="opentelemetry",telemetry_sdk_version="0.20.0",le="5000"} 8794
secretless_tcp_stream_latency_bucket{secretless_connector_name="pg",secretless_service_name="pg_service",service_name="pg:secretless",telemetry_sdk_language="go",telemetry_sdk_name="opentelemetry",telemetry_sdk_version="0.20.0",le="10000"} 8794
secretless_tcp_stream_latency_bucket{secretless_connector_name="pg",secretless_service_name="pg_service",service_name="pg:secretless",telemetry_sdk_language="go",telemetry_sdk_name="opentelemetry",telemetry_sdk_version="0.20.0",le="50000"} 8794
secretless_tcp_stream_latency_bucket{secretless_connector_name="pg",secretless_service_name="pg_service",service_name="pg:secretless",telemetry_sdk_language="go",telemetry_sdk_name="opentelemetry",telemetry_sdk_version="0.20.0",le="1e+06"} 8794
secretless_tcp_stream_latency_bucket{secretless_connector_name="pg",secretless_service_name="pg_service",service_name="pg:secretless",telemetry_sdk_language="go",telemetry_sdk_name="opentelemetry",telemetry_sdk_version="0.20.0",le="+Inf"} 8794
secretless_tcp_stream_latency_sum{secretless_connector_name="pg",secretless_service_name="pg_service",service_name="pg:secretless",telemetry_sdk_language="go",telemetry_sdk_name="opentelemetry",telemetry_sdk_version="0.20.0"} 272060
secretless_tcp_stream_latency_count{secretless_connector_name="pg",secretless_service_name="pg_service",service_name="pg:secretless",telemetry_sdk_language="go",telemetry_sdk_name="opentelemetry",telemetry_sdk_version="0.20.0"} 8794
* Connection #0 to host localhost left intact
```
Loading

0 comments on commit 1de7c61

Please sign in to comment.