Skip to content

Commit

Permalink
Merge pull request #11687 from gyuho/embed-client-version
Browse files Browse the repository at this point in the history
clientv3: fix "hasleader" metadata key, embed client version
  • Loading branch information
gyuho authored Mar 18, 2020
2 parents 07a74d6 + 3390747 commit 1c16c24
Show file tree
Hide file tree
Showing 11 changed files with 193 additions and 12 deletions.
1 change: 1 addition & 0 deletions .words
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ etcd
gRPC
goroutine
goroutines
hasleader
healthcheck
hostname
iff
Expand Down
8 changes: 0 additions & 8 deletions clientv3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"google.golang.org/grpc/codes"
grpccredentials "google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)

Expand Down Expand Up @@ -393,13 +392,6 @@ func (c *Client) dialWithBalancerCreds(ep string) grpccredentials.TransportCrede
return creds
}

// WithRequireLeader requires client requests to only succeed
// when the cluster has a leader.
func WithRequireLeader(ctx context.Context) context.Context {
md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
return metadata.NewOutgoingContext(ctx, md)
}

func newClient(cfg *Config) (*Client, error) {
if cfg == nil {
cfg = &Config{}
Expand Down
48 changes: 48 additions & 0 deletions clientv3/ctx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2020 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package clientv3

import (
"context"

"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
"go.etcd.io/etcd/version"
"google.golang.org/grpc/metadata"
)

// WithRequireLeader requires client requests to only succeed
// when the cluster has a leader.
func WithRequireLeader(ctx context.Context) context.Context {
md, ok := metadata.FromOutgoingContext(ctx)
if !ok { // no outgoing metadata ctx key, create one
md = metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
return metadata.NewOutgoingContext(ctx, md)
}
// overwrite/add 'hasleader' key/value
md.Set(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
return metadata.NewOutgoingContext(ctx, md)
}

// embeds client version
func withVersion(ctx context.Context) context.Context {
md, ok := metadata.FromOutgoingContext(ctx)
if !ok { // no outgoing metadata ctx key, create one
md = metadata.Pairs(rpctypes.MetadataClientAPIVersionKey, version.APIVersion)
return metadata.NewOutgoingContext(ctx, md)
}
// overwrite/add version key/value
md.Set(rpctypes.MetadataClientAPIVersionKey, version.APIVersion)
return metadata.NewOutgoingContext(ctx, md)
}
67 changes: 67 additions & 0 deletions clientv3/ctx_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2020 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package clientv3

import (
"context"
"reflect"
"testing"

"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
"go.etcd.io/etcd/version"
"google.golang.org/grpc/metadata"
)

func TestMetadataWithRequireLeader(t *testing.T) {
ctx := context.TODO()
md, ok := metadata.FromOutgoingContext(ctx)
if ok {
t.Fatal("expected no outgoing metadata ctx key")
}

// add a conflicting key with some other value
md = metadata.Pairs(rpctypes.MetadataRequireLeaderKey, "invalid")
// add a key, and expect not be overwritten
md.Set("hello", "1", "2")
ctx = metadata.NewOutgoingContext(ctx, md)

// expect overwrites but still keep other keys
ctx = WithRequireLeader(ctx)
md, ok = metadata.FromOutgoingContext(ctx)
if !ok {
t.Fatal("expected outgoing metadata ctx key")
}
if ss := md.Get(rpctypes.MetadataRequireLeaderKey); !reflect.DeepEqual(ss, []string{rpctypes.MetadataHasLeader}) {
t.Fatalf("unexpected metadata for %q %v", rpctypes.MetadataRequireLeaderKey, ss)
}
if ss := md.Get("hello"); !reflect.DeepEqual(ss, []string{"1", "2"}) {
t.Fatalf("unexpected metadata for 'hello' %v", ss)
}
}

func TestMetadataWithClientAPIVersion(t *testing.T) {
ctx := withVersion(WithRequireLeader(context.TODO()))

md, ok := metadata.FromOutgoingContext(ctx)
if !ok {
t.Fatal("expected outgoing metadata ctx key")
}
if ss := md.Get(rpctypes.MetadataRequireLeaderKey); !reflect.DeepEqual(ss, []string{rpctypes.MetadataHasLeader}) {
t.Fatalf("unexpected metadata for %q %v", rpctypes.MetadataRequireLeaderKey, ss)
}
if ss := md.Get(rpctypes.MetadataClientAPIVersionKey); !reflect.DeepEqual(ss, []string{version.APIVersion}) {
t.Fatalf("unexpected metadata for %q %v", rpctypes.MetadataClientAPIVersionKey, ss)
}
}
19 changes: 19 additions & 0 deletions clientv3/integration/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ package integration
import (
"bytes"
"context"
"fmt"
"os"
"reflect"
"strconv"
"strings"
"testing"
"time"
Expand All @@ -28,6 +30,7 @@ import (
"go.etcd.io/etcd/integration"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.etcd.io/etcd/pkg/testutil"
"go.etcd.io/etcd/version"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -208,6 +211,22 @@ func TestKVPutWithRequireLeader(t *testing.T) {
t.Fatal(err)
}

cnt, err := clus.Members[0].Metric(
"etcd_server_client_requests_total",
`type="unary"`,
fmt.Sprintf(`client_api_version="%v"`, version.APIVersion),
)
if err != nil {
t.Fatal(err)
}
cv, err := strconv.ParseInt(cnt, 10, 32)
if err != nil {
t.Fatal(err)
}
if cv < 1 { // >1 when retried
t.Fatalf("expected at least 1, got %q", cnt)
}

// clients may give timeout errors since the members are stopped; take
// the clients so that terminating the cluster won't complain
clus.Client(1).Close()
Expand Down
18 changes: 18 additions & 0 deletions clientv3/integration/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"math/rand"
"reflect"
"sort"
"strconv"
"testing"
"time"

Expand All @@ -29,6 +30,7 @@ import (
"go.etcd.io/etcd/integration"
mvccpb "go.etcd.io/etcd/mvcc/mvccpb"
"go.etcd.io/etcd/pkg/testutil"
"go.etcd.io/etcd/version"

"google.golang.org/grpc/metadata"
)
Expand Down Expand Up @@ -839,6 +841,22 @@ func TestWatchWithRequireLeader(t *testing.T) {
if _, ok := <-chNoLeader; !ok {
t.Fatalf("expected response, got closed channel")
}

cnt, err := clus.Members[0].Metric(
"etcd_server_client_requests_total",
`type="stream"`,
fmt.Sprintf(`client_api_version="%v"`, version.APIVersion),
)
if err != nil {
t.Fatal(err)
}
cv, err := strconv.ParseInt(cnt, 10, 32)
if err != nil {
t.Fatal(err)
}
if cv < 2 { // >2 when retried
t.Fatalf("expected at least 2, got %q", cnt)
}
}

// TestWatchWithFilter checks that watch filtering works.
Expand Down
2 changes: 2 additions & 0 deletions clientv3/retry_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
func (c *Client) unaryClientInterceptor(logger *zap.Logger, optFuncs ...retryOption) grpc.UnaryClientInterceptor {
intOpts := reuseOrNewWithCallOptions(defaultOptions, optFuncs)
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
ctx = withVersion(ctx)
grpcOpts, retryOpts := filterCallOptions(opts)
callOpts := reuseOrNewWithCallOptions(intOpts, retryOpts)
// short circuit for simplicity, and avoiding allocations.
Expand Down Expand Up @@ -103,6 +104,7 @@ func (c *Client) unaryClientInterceptor(logger *zap.Logger, optFuncs ...retryOpt
func (c *Client) streamClientInterceptor(logger *zap.Logger, optFuncs ...retryOption) grpc.StreamClientInterceptor {
intOpts := reuseOrNewWithCallOptions(defaultOptions, optFuncs)
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
ctx = withVersion(ctx)
grpcOpts, retryOpts := filterCallOptions(opts)
callOpts := reuseOrNewWithCallOptions(intOpts, retryOpts)
// short circuit for simplicity, and avoiding allocations.
Expand Down
13 changes: 12 additions & 1 deletion etcdserver/api/v3rpc/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {

md, ok := metadata.FromIncomingContext(ctx)
if ok {
ver, vs := "unknown", md.Get(rpctypes.MetadataClientAPIVersionKey)
if len(vs) > 0 {
ver = vs[0]
}
clientRequests.WithLabelValues("unary", ver).Inc()

if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader {
if s.Leader() == types.ID(raft.None) {
return nil, rpctypes.ErrGRPCNoLeader
Expand Down Expand Up @@ -184,6 +190,12 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor

md, ok := metadata.FromIncomingContext(ss.Context())
if ok {
ver, vs := "unknown", md.Get(rpctypes.MetadataClientAPIVersionKey)
if len(vs) > 0 {
ver = vs[0]
}
clientRequests.WithLabelValues("stream", ver).Inc()

if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader {
if s.Leader() == types.ID(raft.None) {
return rpctypes.ErrGRPCNoLeader
Expand All @@ -202,7 +214,6 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor
smap.mu.Unlock()
cancel()
}()

}
}

Expand Down
10 changes: 10 additions & 0 deletions etcdserver/api/v3rpc/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,20 @@ var (
},
[]string{"Type", "API"},
)

clientRequests = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "server",
Name: "client_requests_total",
Help: "The total number of client requests per client version.",
},
[]string{"type", "client_api_version"},
)
)

func init() {
prometheus.MustRegister(sentBytes)
prometheus.MustRegister(receivedBytes)
prometheus.MustRegister(streamFailures)
prometheus.MustRegister(clientRequests)
}
2 changes: 2 additions & 0 deletions etcdserver/api/v3rpc/rpctypes/md.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,6 @@ package rpctypes
var (
MetadataRequireLeaderKey = "hasleader"
MetadataHasLeader = "true"

MetadataClientAPIVersionKey = "client-api-version"
)
17 changes: 14 additions & 3 deletions integration/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1143,7 +1143,7 @@ func (m *member) Terminate(t testing.TB) {
}

// Metric gets the metric value for a member
func (m *member) Metric(metricName string) (string, error) {
func (m *member) Metric(metricName string, expectLabels ...string) (string, error) {
cfgtls := transport.TLSInfo{}
tr, err := transport.NewTimeoutTransport(cfgtls, time.Second, time.Second, time.Second)
if err != nil {
Expand All @@ -1161,9 +1161,20 @@ func (m *member) Metric(metricName string) (string, error) {
}
lines := strings.Split(string(b), "\n")
for _, l := range lines {
if strings.HasPrefix(l, metricName) {
return strings.Split(l, " ")[1], nil
if !strings.HasPrefix(l, metricName) {
continue
}
ok := true
for _, lv := range expectLabels {
if !strings.Contains(l, lv) {
ok = false
break
}
}
if !ok {
continue
}
return strings.Split(l, " ")[1], nil
}
return "", nil
}
Expand Down

0 comments on commit 1c16c24

Please sign in to comment.