Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Telemetry #70

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ GOPATH := $(lastword $(subst :, ,${GOPATH}))# use last GOPATH entry

# Project information
GOVERSION := 1.13.12
PROJECT := $(CURRENT_DIR:$(GOPATH)/src/%=%)
PROJECT := $(shell go list -m -mod=vendor)
NAME := $(notdir $(PROJECT))
GIT_COMMIT ?= $(shell git rev-parse --short HEAD)
GIT_DESCRIBE ?= $(shell git describe --tags --always)
Expand Down
39 changes: 39 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,45 @@ tls_server_name = ""
ping_type = "udp"
```

#### Telemetry

Consul ESM uses the [OpenTelemetry](https://opentelemetry.io/) project for its
monitoring engine to collect various runtime metrics. It currently supports
metrics exported to stdout, dogstatsd server, and prometheus endpoint.

| Metric Name | Description |
|-|-|
| `consul-esm.coord.txn` | A counter of node check updates using Consul txn API |
| `consul-esm.check.txn` | A counter of check updates using Consul txn API |
| `consul-esm.checks.update.ms` | The duration (milliseconds) to update checks |

##### Configure Telemetry
Enable telemetry by configuring the `telemetry` block with one metric provider.

```hcl
telemetry {
metrics_prefix = "consul-esm"

stdout {
period = "60s"
pretty_print = false
do_not_print_time = false
}

dogstatsd {
// address describes the destination for exporting dogstatsd data.
// e.g., udp://host:port tcp://host:port unix:///socket/path
address = "udp://127.0.0.1:8125"
period = "60s"
}

prometheus {
cache_period = "60s"
port = 8888
}
}
```

[HCL]: https://github.com/hashicorp/hcl "HashiCorp Configuration Language (HCL)"

## Contributing
Expand Down
11 changes: 11 additions & 0 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ type Agent struct {
checkRunner *CheckRunner
id string

// instruments manages the various metric instruments used for monitoring
// the agent.
instruments *agentInstruments

shutdownCh chan struct{}
shutdown bool

Expand All @@ -73,11 +77,18 @@ func NewAgent(config *Config, logger *log.Logger) (*Agent, error) {
return nil, err
}

instr, err := newAgentInstruments()
if err != nil {
logger.Printf("[ERR] failed to setup Agent telemetry instruments: %s", err)
return nil, err
}

agent := Agent{
config: config,
client: client,
id: config.InstanceID,
logger: logger,
instruments: instr,
shutdownCh: make(chan struct{}),
inflightPings: make(map[string]struct{}),
knownNodeStatuses: make(map[string]lastKnownStatus),
Expand Down
37 changes: 37 additions & 0 deletions agent_telemetry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package main

import (
"context"
"fmt"

"github.com/hashicorp/hcl-opentelemetry"
"go.opentelemetry.io/otel/api/metric"
)

type agentInstruments struct {
coordTxnCounter metric.Int64Counter
}

func newAgentInstruments() (*agentInstruments, error) {
meter := hclotel.GlobalMeter()
prefix := hclotel.GlobalMeterName()

coordTxn, err := meter.NewInt64Counter(
fmt.Sprintf("%s.coord.txn", prefix),
metric.WithDescription("A counter of node check updates using Consul txn API"))
if err != nil {
return nil, err
}

return &agentInstruments{
coordTxnCounter: coordTxn,
}, nil
}

func (i *agentInstruments) coordTxn() {
if i == nil {
return
}

i.coordTxnCounter.Add(context.Background(), 1)
}
15 changes: 15 additions & 0 deletions check.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ type CheckRunner struct {
logger *log.Logger
client *api.Client

// instruments manages the various metric instruments used for monitoring
// the check runner.
instruments *checkRunnerInstruments

// checks are unmodified checks as retrieved from Consul Catalog
checks map[types.CheckID]*api.HealthCheck

Expand All @@ -51,9 +55,16 @@ type CheckRunner struct {

func NewCheckRunner(logger *log.Logger, client *api.Client, updateInterval,
minimumInterval time.Duration) *CheckRunner {
instr, err := newCheckRunnerInstruments()
if err != nil {
logger.Printf("[ERR] failed to setup CheckRunner telemetry instruments, "+
"metrics will not be reported for this runner: %s", err)
}

return &CheckRunner{
logger: logger,
client: client,
instruments: instr,
checks: make(map[types.CheckID]*api.HealthCheck),
checksHTTP: make(map[types.CheckID]*consulchecks.CheckHTTP),
checksTCP: make(map[types.CheckID]*consulchecks.CheckTCP),
Expand Down Expand Up @@ -182,6 +193,7 @@ func (c *CheckRunner) updateCheckTCP(latestCheck *api.HealthCheck, checkHash typ
// UpdateChecks takes a list of checks from the catalog and updates
// our list of running checks to match.
func (c *CheckRunner) UpdateChecks(checks api.HealthChecks) {
start := time.Now()
c.Lock()
defer c.Unlock()

Expand Down Expand Up @@ -255,6 +267,8 @@ func (c *CheckRunner) UpdateChecks(checks api.HealthChecks) {
c.logger.Printf("[INFO] Updated %d checks, found %d, added %d, updated %d, removed %d",
len(checks), len(found), len(added), len(updated), len(removed))
}

c.instruments.checksUpdate(time.Since(start))
}

// UpdateCheck handles the output of an HTTP/TCP check and decides whether or not
Expand Down Expand Up @@ -330,6 +344,7 @@ func (c *CheckRunner) handleCheckUpdate(check *api.HealthCheck, status, output s
existing.Output = output

c.logger.Printf("[INFO] Updating output and status for %q", existing.CheckID)
c.instruments.checkTxn()

ops := api.TxnOps{
&api.TxnOp{
Expand Down
59 changes: 59 additions & 0 deletions check_telemetry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package main

import (
"context"
"fmt"
"time"

"github.com/hashicorp/hcl-opentelemetry"
"go.opentelemetry.io/otel/api/metric"
)

type checkRunnerInstruments struct {
checkTxnCounter metric.Int64Counter
checksUpdateDuration metric.Int64ValueRecorder
}

func newCheckRunnerInstruments() (*checkRunnerInstruments, error) {
meter := hclotel.GlobalMeter()
prefix := hclotel.GlobalMeterName()

checkTxn, err := meter.NewInt64Counter(
fmt.Sprintf("%s.check.txn", prefix),
metric.WithDescription("A counter of check updates using Consul txn API"))
if err != nil {
return nil, err
}

checksUpdate, err := meter.NewInt64ValueRecorder(
fmt.Sprintf("%s.checks.update.ms", prefix),
metric.WithDescription("The duration (milliseconds) to update checks"))
if err != nil {
return nil, err
}

return &checkRunnerInstruments{
checkTxnCounter: checkTxn,
checksUpdateDuration: checksUpdate,
}, nil
}

// checkTxn is a wrapper to increment counter for check updates. This safely
// skips reporting metrics if the instrument isn't instantiated.
func (i *checkRunnerInstruments) checkTxn() {
if i == nil {
return
}

i.checkTxnCounter.Add(context.Background(), 1)
}

// checksUpdate is a wrapper to record the duration to update checks. This
// safely skips reporting metrics if the instrument isn't instantiated.
func (i *checkRunnerInstruments) checksUpdate(dur time.Duration) {
if i == nil {
return
}

i.checksUpdateDuration.Record(context.Background(), dur.Milliseconds())
}
25 changes: 21 additions & 4 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import (
"strings"
"time"

"github.com/hashicorp/consul-esm/version"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/command/flags"
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/hcl"
"github.com/hashicorp/hcl-opentelemetry"
"github.com/hashicorp/hcl/hcl/ast"
"github.com/mitchellh/mapstructure"
)
Expand All @@ -25,6 +27,7 @@ type Config struct {
LogLevel string
EnableSyslog bool
SyslogFacility string
Telemetry *hclotel.TelemetryConfig

Service string
Tag string
Expand Down Expand Up @@ -92,6 +95,9 @@ func DefaultConfig() (*Config, error) {
return nil, err
}

tel := hclotel.DefaultTelemetryConfig()
tel.MetricsPrefix = hclotel.String(version.Name)

return &Config{
InstanceID: instanceID,
LogLevel: "INFO",
Expand All @@ -107,15 +113,17 @@ func DefaultConfig() (*Config, error) {
NodeHealthRefreshInterval: 1 * time.Hour,
NodeReconnectTimeout: 72 * time.Hour,
PingType: PingTypeUDP,
Telemetry: tel,
DisableCoordinateUpdates: false,
}, nil
}

// HumanConfig contains configuration that the practitioner can set
type HumanConfig struct {
LogLevel flags.StringValue `mapstructure:"log_level"`
EnableSyslog flags.BoolValue `mapstructure:"enable_syslog"`
SyslogFacility flags.StringValue `mapstructure:"syslog_facility"`
LogLevel flags.StringValue `mapstructure:"log_level"`
EnableSyslog flags.BoolValue `mapstructure:"enable_syslog"`
SyslogFacility flags.StringValue `mapstructure:"syslog_facility"`
Telemetry *hclotel.TelemetryConfig `mapstructure:"telemetry"`
findkim marked this conversation as resolved.
Show resolved Hide resolved

InstanceID flags.StringValue `mapstructure:"instance_id"`
Service flags.StringValue `mapstructure:"consul_service"`
Expand Down Expand Up @@ -175,7 +183,11 @@ func DecodeConfig(r io.Reader) (*HumanConfig, error) {

// Decode the simple (non service/handler) objects into Config
msdec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
DecodeHook: flags.ConfigDecodeHook,
DecodeHook: mapstructure.ComposeDecodeHookFunc(
hclotel.HookWeakDecodeFromSlice,
flags.ConfigDecodeHook,
mapstructure.StringToTimeDurationHookFunc(),
),
Result: &config,
ErrorUnused: true,
})
Expand All @@ -201,6 +213,8 @@ func BuildConfig(configFiles []string) (*Config, error) {
config.KVPath = config.KVPath + "/"
}

config.Telemetry.Finalize()

if err := ValidateConfig(config); err != nil {
return nil, fmt.Errorf("Error parsing config: %v", err)
}
Expand Down Expand Up @@ -242,6 +256,7 @@ func MergeConfigPaths(dst *Config, paths []string) error {
if err != nil {
return err
}

if !strings.HasSuffix(fi.Name(), ".json") && !strings.HasSuffix(fi.Name(), ".hcl") {
return nil
}
Expand Down Expand Up @@ -290,4 +305,6 @@ func MergeConfig(dst *Config, src *HumanConfig) {
src.TLSServerName.Merge(&dst.TLSServerName)
src.PingType.Merge(&dst.PingType)
src.DisableCoordinateUpdates.Merge(&dst.DisableCoordinateUpdates)

dst.Telemetry = dst.Telemetry.Merge(src.Telemetry)
}
2 changes: 2 additions & 0 deletions coordinate.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,8 @@ func (a *Agent) updateFailedNodeTxn(node *api.Node, kvClient *api.KV, key string

// updateNodeCheck updates the node's externalNodeHealth check with the given status/output.
func (a *Agent) updateNodeCheck(node *api.Node, ops api.TxnOps, status, output string) error {
a.instruments.coordTxn()

// Update the external health check status.
ops = append(ops, &api.TxnOp{
Check: &api.CheckTxnOp{
Expand Down
11 changes: 6 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,26 @@ go 1.13
require (
github.com/DataDog/datadog-go v3.2.0+incompatible // indirect
github.com/Microsoft/go-winio v0.4.5 // indirect
github.com/google/btree v1.0.0 // indirect
github.com/hashicorp/consul v1.6.1
github.com/hashicorp/consul/api v1.2.0
github.com/hashicorp/consul/api v1.4.0
github.com/hashicorp/consul/sdk v0.4.0
github.com/hashicorp/go-immutable-radix v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.0.0
github.com/hashicorp/go-retryablehttp v0.6.3 // indirect
github.com/hashicorp/go-rootcerts v1.0.1 // indirect
github.com/hashicorp/go-sockaddr v1.0.2 // indirect
github.com/hashicorp/go-uuid v1.0.1
github.com/hashicorp/go-version v1.2.0
github.com/hashicorp/golang-lru v0.5.3 // indirect
github.com/hashicorp/hcl v1.0.0
github.com/hashicorp/hcl-opentelemetry v0.0.0-20200626211046-b18a6d268b77
github.com/hashicorp/serf v0.8.4
github.com/miekg/dns v1.1.22 // indirect
github.com/mitchellh/cli v1.0.0
github.com/mitchellh/hashstructure v1.0.0 // indirect
github.com/mitchellh/mapstructure v1.1.2
github.com/prometheus/client_golang v1.2.1 // indirect
github.com/mitchellh/mapstructure v1.3.2
github.com/sparrc/go-ping v0.0.0-20190613174326-4e5b6552494c
go.opentelemetry.io/otel v0.6.0
golang.org/x/crypto v0.0.0-20191002192127-34f69633bfdc // indirect
golang.org/x/net v0.0.0-20191003171128-d98b1b443823 // indirect
google.golang.org/grpc v1.25.0 // indirect
)
Loading