Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: emit a labeled gauge of connected xDS streams by version #10243

Merged
merged 4 commits into from
May 14, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/10243.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:feature
xds: emit a labeled gauge of connected xDS streams by version
```
2 changes: 2 additions & 0 deletions agent/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/hashicorp/consul/agent/router"
"github.com/hashicorp/consul/agent/submatview"
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/agent/xds"
"github.com/hashicorp/consul/ipaddr"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/logging"
Expand Down Expand Up @@ -195,6 +196,7 @@ func getPrometheusDefs(cfg lib.TelemetryConfig) ([]prometheus.GaugeDefinition, [
consul.RPCGauges,
consul.SessionGauges,
grpc.StatsGauges,
xds.StatsGauges,
usagemetrics.Gauges,
consul.ReplicationGauges,
Gauges,
Expand Down
2 changes: 2 additions & 0 deletions agent/xds/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type ADSDeltaStream = envoy_discovery_v3.AggregatedDiscoveryService_DeltaAggrega

// DeltaAggregatedResources implements envoy_discovery_v3.AggregatedDiscoveryServiceServer
func (s *Server) DeltaAggregatedResources(stream ADSDeltaStream) error {
defer s.activeStreams.Increment("v3")()

// a channel for receiving incoming requests
reqCh := make(chan *envoy_discovery_v3.DeltaDiscoveryRequest)
reqStop := int32(0)
Expand Down
2 changes: 2 additions & 0 deletions agent/xds/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
// Check no response sent yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)

requireProtocolVersionGauge(t, scenario, "v3", 1)

// Deliver a new snapshot (tcp with one tcp upstream)
mgr.DeliverConfig(t, sid, snap)

Expand Down
41 changes: 41 additions & 0 deletions agent/xds/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
envoy_discovery_v2 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
envoy_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"

"github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/go-hclog"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand All @@ -25,6 +27,13 @@ import (
"github.com/hashicorp/consul/tlsutil"
)

var StatsGauges = []prometheus.GaugeDefinition{
{
Name: []string{"xds", "server", "streams"},
Help: "Measures the number of active xDS streams handled by the server split by protocol version.",
},
}

// ADSStream is a shorter way of referring to this thing...
type ADSStream = envoy_discovery_v3.AggregatedDiscoveryService_StreamAggregatedResourcesServer
type ADSStream_v2 = envoy_discovery_v2.AggregatedDiscoveryService_StreamAggregatedResourcesServer
Expand Down Expand Up @@ -141,6 +150,36 @@ type Server struct {
AuthCheckFrequency time.Duration

DisableV2Protocol bool

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I correct in assuming that there can only be one Server instance per host? Do we run multiple servers on different ports or anything like that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is exactly one xDS server per consul agent.

activeStreams activeStreamCounters
}

// activeStreamCounters simply encapsulates two counters accessed atomically to
// ensure alignment is correct.
type activeStreamCounters struct {
xDSv3 uint64
xDSv2 uint64
}

func (c *activeStreamCounters) Increment(xdsVersion string) func() {
var counter *uint64
switch xdsVersion {
case "v3":
counter = &c.xDSv3
case "v2":
counter = &c.xDSv2
default:
return func() {}
}

count := atomic.AddUint64(counter, 1)
metrics.SetGaugeWithLabels([]string{"xds", "server", "streams"}, float32(count),
[]metrics.Label{{Name: "version", Value: xdsVersion}})
return func() {
count := atomic.AddUint64(counter, ^uint64(0))
metrics.SetGaugeWithLabels([]string{"xds", "server", "streams"}, float32(count),
[]metrics.Label{{Name: "version", Value: xdsVersion}})
rboyer marked this conversation as resolved.
Show resolved Hide resolved
}
}

func NewServer(
Expand Down Expand Up @@ -171,6 +210,8 @@ func (s *Server) StreamAggregatedResources(stream ADSStream) error {

// Deprecated: remove when xDS v2 is no longer supported
func (s *Server) streamAggregatedResources(stream ADSStream) error {
defer s.activeStreams.Increment("v2")()

// Note: despite dealing entirely in v3 protobufs, this function is
// exclusively used from the xDS v2 shim RPC handler, so the logging below
// will refer to it as "v2".
Expand Down
2 changes: 2 additions & 0 deletions agent/xds/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ func TestServer_StreamAggregatedResources_v2_BasicProtocol_TCP(t *testing.T) {
// Check no response sent yet
assertChanBlocked(t, envoy.stream.sendCh)

requireProtocolVersionGauge(t, scenario, "v2", 1)

// Deliver a new snapshot
snap := newTestSnapshot(t, nil, "")
mgr.DeliverConfig(t, sid, snap)
Expand Down
33 changes: 33 additions & 0 deletions agent/xds/xds_protocol_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
envoy_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
envoy_type_v3 "github.com/envoyproxy/go-control-plane/envoy/type/v3"

"github.com/armon/go-metrics"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/wrappers"
Expand Down Expand Up @@ -118,6 +119,7 @@ type testServerScenario struct {
server *Server
mgr *testManager
envoy *TestEnvoy
sink *metrics.InmemSink
errCh <-chan error
}

Expand Down Expand Up @@ -155,6 +157,16 @@ func newTestServerScenarioInner(
envoy.Close()
})

sink := metrics.NewInmemSink(1*time.Minute, 1*time.Minute)
cfg := metrics.DefaultConfig("consul.xds.test")
cfg.EnableHostname = false
rboyer marked this conversation as resolved.
Show resolved Hide resolved
metrics.NewGlobal(cfg, sink)

t.Cleanup(func() {
sink := &metrics.BlackholeSink{}
metrics.NewGlobal(cfg, sink)
})

s := NewServer(
testutil.Logger(t),
mgr,
Expand All @@ -178,6 +190,7 @@ func newTestServerScenarioInner(
server: s,
mgr: mgr,
envoy: envoy,
sink: sink,
errCh: errCh,
}
}
Expand Down Expand Up @@ -647,3 +660,23 @@ func runStep(t *testing.T, name string, fn func(t *testing.T)) {
t.FailNow()
}
}

func requireProtocolVersionGauge(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you expect that we will call this from other places? Right now it's only called from one place so it might be good to keep it next to the one call site.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's called in two places: server_test.go and delta_test.go for v2 and v3 respectively.

t *testing.T,
scenario *testServerScenario,
xdsVersion string,
expected int,
) {
data := scenario.sink.Data()
require.Len(t, data, 1)

item := data[0]
require.Len(t, item.Gauges, 1)

val, ok := item.Gauges["consul.xds.test.xds.server.streams;version="+xdsVersion]
require.True(t, ok)

require.Equal(t, "consul.xds.test.xds.server.streams", val.Name)
require.Equal(t, expected, int(val.Value))
require.Equal(t, []metrics.Label{{Name: "version", Value: xdsVersion}}, val.Labels)
}
3 changes: 2 additions & 1 deletion website/content/docs/agent/telemetry.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,8 @@ These metrics are used to monitor the health of the Consul servers.
| `consul.grpc.server.connection.count` | Counts the number of new gRPC connections received by the server. | connections | counter |
| `consul.grpc.server.connections` | Measures the number of active gRPC connections open on the server. | connections | gauge |
| `consul.grpc.server.stream.count` | Counts the number of new gRPC streams received by the server. | streams | counter |
| `consul.grpc.server.streams` | Measures the number of active gRPC streams handled by the server. | streams | guage |
| `consul.grpc.server.streams` | Measures the number of active gRPC streams handled by the server. | streams | gauge |
| `consul.xds.server.streams` | Measures the number of active xDS streams handled by the server split by protocol version. | streams | gauge |

## Cluster Health

Expand Down