Skip to content

Commit

Permalink
xds: emit a labeled gauge of connected xDS streams by version (#10243)
Browse files Browse the repository at this point in the history
Fixes #10099
  • Loading branch information
rboyer authored and hc-github-team-consul-core committed May 14, 2021
1 parent e83dc43 commit 4025a63
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 1 deletion.
3 changes: 3 additions & 0 deletions .changelog/10243.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:feature
xds: emit a labeled gauge of connected xDS streams by version
```
2 changes: 2 additions & 0 deletions agent/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/hashicorp/consul/agent/router"
"github.com/hashicorp/consul/agent/submatview"
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/agent/xds"
"github.com/hashicorp/consul/ipaddr"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/logging"
Expand Down Expand Up @@ -195,6 +196,7 @@ func getPrometheusDefs(cfg lib.TelemetryConfig) ([]prometheus.GaugeDefinition, [
consul.RPCGauges,
consul.SessionGauges,
grpc.StatsGauges,
xds.StatsGauges,
usagemetrics.Gauges,
consul.ReplicationGauges,
Gauges,
Expand Down
2 changes: 2 additions & 0 deletions agent/xds/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type ADSDeltaStream = envoy_discovery_v3.AggregatedDiscoveryService_DeltaAggrega

// DeltaAggregatedResources implements envoy_discovery_v3.AggregatedDiscoveryServiceServer
func (s *Server) DeltaAggregatedResources(stream ADSDeltaStream) error {
defer s.activeStreams.Increment("v3")()

// a channel for receiving incoming requests
reqCh := make(chan *envoy_discovery_v3.DeltaDiscoveryRequest)
reqStop := int32(0)
Expand Down
2 changes: 2 additions & 0 deletions agent/xds/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
// Check no response sent yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)

requireProtocolVersionGauge(t, scenario, "v3", 1)

// Deliver a new snapshot (tcp with one tcp upstream)
mgr.DeliverConfig(t, sid, snap)

Expand Down
41 changes: 41 additions & 0 deletions agent/xds/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
envoy_discovery_v2 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
envoy_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"

"github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/go-hclog"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand All @@ -25,6 +27,13 @@ import (
"github.com/hashicorp/consul/tlsutil"
)

var StatsGauges = []prometheus.GaugeDefinition{
{
Name: []string{"xds", "server", "streams"},
Help: "Measures the number of active xDS streams handled by the server split by protocol version.",
},
}

// ADSStream is a shorter way of referring to this thing...
type ADSStream = envoy_discovery_v3.AggregatedDiscoveryService_StreamAggregatedResourcesServer
type ADSStream_v2 = envoy_discovery_v2.AggregatedDiscoveryService_StreamAggregatedResourcesServer
Expand Down Expand Up @@ -141,6 +150,36 @@ type Server struct {
AuthCheckFrequency time.Duration

DisableV2Protocol bool

activeStreams activeStreamCounters
}

// activeStreamCounters simply encapsulates two counters accessed atomically to
// ensure alignment is correct.
type activeStreamCounters struct {
xDSv3 uint64
xDSv2 uint64
}

func (c *activeStreamCounters) Increment(xdsVersion string) func() {
var counter *uint64
switch xdsVersion {
case "v3":
counter = &c.xDSv3
case "v2":
counter = &c.xDSv2
default:
return func() {}
}

labels := []metrics.Label{{Name: "version", Value: xdsVersion}}

count := atomic.AddUint64(counter, 1)
metrics.SetGaugeWithLabels([]string{"xds", "server", "streams"}, float32(count), labels)
return func() {
count := atomic.AddUint64(counter, ^uint64(0))
metrics.SetGaugeWithLabels([]string{"xds", "server", "streams"}, float32(count), labels)
}
}

func NewServer(
Expand Down Expand Up @@ -171,6 +210,8 @@ func (s *Server) StreamAggregatedResources(stream ADSStream) error {

// Deprecated: remove when xDS v2 is no longer supported
func (s *Server) streamAggregatedResources(stream ADSStream) error {
defer s.activeStreams.Increment("v2")()

// Note: despite dealing entirely in v3 protobufs, this function is
// exclusively used from the xDS v2 shim RPC handler, so the logging below
// will refer to it as "v2".
Expand Down
2 changes: 2 additions & 0 deletions agent/xds/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ func TestServer_StreamAggregatedResources_v2_BasicProtocol_TCP(t *testing.T) {
// Check no response sent yet
assertChanBlocked(t, envoy.stream.sendCh)

requireProtocolVersionGauge(t, scenario, "v2", 1)

// Deliver a new snapshot
snap := newTestSnapshot(t, nil, "")
mgr.DeliverConfig(t, sid, snap)
Expand Down
34 changes: 34 additions & 0 deletions agent/xds/xds_protocol_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
envoy_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
envoy_type_v3 "github.com/envoyproxy/go-control-plane/envoy/type/v3"

"github.com/armon/go-metrics"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/wrappers"
Expand Down Expand Up @@ -118,6 +119,7 @@ type testServerScenario struct {
server *Server
mgr *testManager
envoy *TestEnvoy
sink *metrics.InmemSink
errCh <-chan error
}

Expand Down Expand Up @@ -155,6 +157,17 @@ func newTestServerScenarioInner(
envoy.Close()
})

sink := metrics.NewInmemSink(1*time.Minute, 1*time.Minute)
cfg := metrics.DefaultConfig("consul.xds.test")
cfg.EnableHostname = false
cfg.EnableRuntimeMetrics = false
metrics.NewGlobal(cfg, sink)

t.Cleanup(func() {
sink := &metrics.BlackholeSink{}
metrics.NewGlobal(cfg, sink)
})

s := NewServer(
testutil.Logger(t),
mgr,
Expand All @@ -178,6 +191,7 @@ func newTestServerScenarioInner(
server: s,
mgr: mgr,
envoy: envoy,
sink: sink,
errCh: errCh,
}
}
Expand Down Expand Up @@ -647,3 +661,23 @@ func runStep(t *testing.T, name string, fn func(t *testing.T)) {
t.FailNow()
}
}

func requireProtocolVersionGauge(
t *testing.T,
scenario *testServerScenario,
xdsVersion string,
expected int,
) {
data := scenario.sink.Data()
require.Len(t, data, 1)

item := data[0]
require.Len(t, item.Gauges, 1)

val, ok := item.Gauges["consul.xds.test.xds.server.streams;version="+xdsVersion]
require.True(t, ok)

require.Equal(t, "consul.xds.test.xds.server.streams", val.Name)
require.Equal(t, expected, int(val.Value))
require.Equal(t, []metrics.Label{{Name: "version", Value: xdsVersion}}, val.Labels)
}
3 changes: 2 additions & 1 deletion website/content/docs/agent/telemetry.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,8 @@ These metrics are used to monitor the health of the Consul servers.
| `consul.grpc.server.connection.count` | Counts the number of new gRPC connections received by the server. | connections | counter |
| `consul.grpc.server.connections` | Measures the number of active gRPC connections open on the server. | connections | gauge |
| `consul.grpc.server.stream.count` | Counts the number of new gRPC streams received by the server. | streams | counter |
| `consul.grpc.server.streams` | Measures the number of active gRPC streams handled by the server. | streams | guage |
| `consul.grpc.server.streams` | Measures the number of active gRPC streams handled by the server. | streams | gauge |
| `consul.xds.server.streams` | Measures the number of active xDS streams handled by the server split by protocol version. | streams | gauge |

## Cluster Health

Expand Down

0 comments on commit 4025a63

Please sign in to comment.