Skip to content

Commit

Permalink
client: service discovery can watch core with auth
Browse files Browse the repository at this point in the history
  • Loading branch information
jschwinger233 authored and CMGS committed Aug 14, 2020
1 parent c1c82a6 commit e6b5d62
Show file tree
Hide file tree
Showing 12 changed files with 113 additions and 82 deletions.
28 changes: 11 additions & 17 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@ import (
"context"
"time"

log "github.com/sirupsen/logrus"

"github.com/projecteru2/core/auth"
"github.com/projecteru2/core/client/interceptor"
_ "github.com/projecteru2/core/client/resolver/eru"
_ "github.com/projecteru2/core/client/resolver/static"
_ "github.com/projecteru2/core/client/resolver/eru" // register grpc resolver: eru://
_ "github.com/projecteru2/core/client/resolver/static" // register grpc resolver: static://
pb "github.com/projecteru2/core/rpc/gen"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
Expand All @@ -24,12 +22,12 @@ type Client struct {
}

// NewClient new a client
func NewClient(ctx context.Context, addr string, authConfig types.AuthConfig) *Client {
client := &Client{
func NewClient(ctx context.Context, addr string, authConfig types.AuthConfig) (*Client, error) {
cc, err := dial(ctx, addr, authConfig)
return &Client{
addr: addr,
conn: dial(ctx, addr, authConfig),
}
return client
conn: cc,
}, err
}

// GetConn return connection
Expand All @@ -42,22 +40,18 @@ func (c *Client) GetRPCClient() pb.CoreRPCClient {
return pb.NewCoreRPCClient(c.conn)
}

func dial(ctx context.Context, addr string, authConfig types.AuthConfig) *grpc.ClientConn {
func dial(ctx context.Context, addr string, authConfig types.AuthConfig) (*grpc.ClientConn, error) {
opts := []grpc.DialOption{
grpc.WithInsecure(),
grpc.WithKeepaliveParams(keepalive.ClientParameters{Time: 6 * 60 * time.Second, Timeout: time.Second}),
grpc.WithBalancerName("round_robin"),
grpc.WithBalancerName("round_robin"), // nolint:staticcheck
grpc.WithUnaryInterceptor(interceptor.NewUnaryRetry(interceptor.RetryOptions{Max: 1})),
grpc.WithStreamInterceptor(interceptor.NewStreamRetry(interceptor.RetryOptions{Max: 1})),
}
if authConfig.Username != "" {
opts = append(opts, grpc.WithPerRPCCredentials(auth.NewCredential(authConfig)))
}

target := utils.MakeTarget(addr)
cc, err := grpc.DialContext(ctx, target, opts...)
if err != nil {
log.Panicf("[NewClient] failed to dial grpc %s: %v", addr, err)
}
return cc
target := utils.MakeTarget(addr, authConfig)
return grpc.DialContext(ctx, target, opts...)
}
7 changes: 5 additions & 2 deletions client/interceptor/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"google.golang.org/grpc"
)

// NewUnaryRetry makes unary RPC retry on error
func NewUnaryRetry(retryOpts RetryOptions) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
return backoff.Retry(func() error {
Expand All @@ -18,11 +19,13 @@ func NewUnaryRetry(retryOpts RetryOptions) grpc.UnaryClientInterceptor {
}
}

// RPCNeedRetry records rpc stream methods to retry
var RPCNeedRetry = map[string]struct{}{
"/pb.CoreRPC/ContainerStatusStream": struct{}{},
"/pb.CoreRPC/WatchServiceStatus": struct{}{},
"/pb.CoreRPC/ContainerStatusStream": {},
"/pb.CoreRPC/WatchServiceStatus": {},
}

// NewStreamRetry make specific stream retry on error
func NewStreamRetry(retryOpts RetryOptions) grpc.StreamClientInterceptor {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
stream, err := streamer(ctx, desc, cc, method, opts...)
Expand Down
1 change: 1 addition & 0 deletions client/interceptor/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"google.golang.org/grpc"
)

// RetryOptions .
type RetryOptions struct {
Max int
}
Expand Down
4 changes: 2 additions & 2 deletions client/resolver/eru/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import "google.golang.org/grpc/resolver"

type eruResolverBuilder struct{}

func init() {
func init() { // nolint
resolver.Register(&eruResolverBuilder{})
}

Expand All @@ -15,5 +15,5 @@ func (b *eruResolverBuilder) Scheme() string {

// Build for interface
func (b *eruResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
return New(cc, target.Endpoint), nil
return New(cc, target.Endpoint, target.Authority), nil
}
42 changes: 26 additions & 16 deletions client/resolver/eru/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,59 +2,69 @@ package eru

import (
"context"
"strings"

"github.com/projecteru2/core/client/service_discovery"
"github.com/projecteru2/core/client/servicediscovery"
"github.com/projecteru2/core/types"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc/resolver"
)

type eruResolver struct {
// Resolver for target eru://{addr}
type Resolver struct {
cc resolver.ClientConn
cancel context.CancelFunc
discovery service_discovery.ServiceDiscovery
discovery servicediscovery.ServiceDiscovery
}

func New(cc resolver.ClientConn, endpoint string) *eruResolver {
r := &eruResolver{
// New Resolver
func New(cc resolver.ClientConn, endpoint string, authority string) *Resolver {
var username, password string
if authority != "" {
parts := strings.Split(authority, ":")
username, password = strings.TrimLeft(parts[0], "@"), parts[1]
}
authConfig := types.AuthConfig{Username: username, Password: password}
r := &Resolver{
cc: cc,
discovery: service_discovery.New(endpoint),
discovery: servicediscovery.New(endpoint, authConfig),
}
go r.sync()
return r
}

// ResolveNow for interface
func (r *eruResolver) ResolveNow(_ resolver.ResolveNowOptions) {}
func (r *Resolver) ResolveNow(_ resolver.ResolveNowOptions) {}

// Close for interface
func (r *eruResolver) Close() {
func (r *Resolver) Close() {
r.cancel()
}

func (r *eruResolver) sync() {
log.Info("[eruResolver] start sync service discovery")
func (r *Resolver) sync() {
log.Info("[EruResolver] start sync service discovery")
ctx, cancel := context.WithCancel(context.Background())
r.cancel = cancel
defer cancel()

ch, err := r.discovery.Watch(ctx)
if err != nil {
log.Errorf("[eruResolver] failed to watch service status: %v", err)
log.Errorf("[EruResolver] failed to watch service status: %v", err)
return
}
for {
select {
case <-ctx.Done():
log.Errorf("[eruResolver] watch interrupted: %v", ctx.Err())
break
log.Errorf("[EruResolver] watch interrupted: %v", ctx.Err())
return
case endpoints, ok := <-ch:
if !ok {
log.Error("[eruResolver] watch closed")
break
log.Error("[EruResolver] watch closed")
return
}

var addresses []resolver.Address
log.Debugf("[eruResolver] update state: %v", endpoints)
log.Debugf("[EruResolver] update state: %v", endpoints)
for _, ep := range endpoints {
addresses = append(addresses, resolver.Address{Addr: ep})
}
Expand Down
2 changes: 1 addition & 1 deletion client/resolver/static/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import "google.golang.org/grpc/resolver"

type staticResolverBuilder struct{}

func init() {
func init() { // nolint
resolver.Register(&staticResolverBuilder{})
}

Expand Down
12 changes: 7 additions & 5 deletions client/resolver/static/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,27 @@ import (
"google.golang.org/grpc/resolver"
)

type staticResolver struct {
// Resolver for target static://{addr1},{addr2},{addr3}
type Resolver struct {
addresses []resolver.Address
cc resolver.ClientConn
}

func New(cc resolver.ClientConn, endpoints string) *staticResolver {
// New Resolver
func New(cc resolver.ClientConn, endpoints string) *Resolver {
var addresses []resolver.Address
for _, ep := range strings.Split(endpoints, ",") {
addresses = append(addresses, resolver.Address{Addr: ep})
}
cc.UpdateState(resolver.State{Addresses: addresses})
return &staticResolver{
return &Resolver{
cc: cc,
addresses: addresses,
}
}

// ResolveNow for interface
func (r *staticResolver) ResolveNow(_ resolver.ResolveNowOptions) {}
func (r *Resolver) ResolveNow(_ resolver.ResolveNowOptions) {}

// Close for interface
func (r *staticResolver) Close() {}
func (r *Resolver) Close() {}
27 changes: 27 additions & 0 deletions client/servicediscovery/builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package servicediscovery

import "google.golang.org/grpc/resolver"

// LBResolverBuilder for service discovery lb
type LBResolverBuilder struct {
updateCh chan []string
}

var lbResolverBuilder *LBResolverBuilder

func init() { // nolint
lbResolverBuilder = &LBResolverBuilder{
updateCh: make(chan []string),
}
resolver.Register(lbResolverBuilder)
}

// Scheme for interface
func (b *LBResolverBuilder) Scheme() string {
return "lb"
}

// Build for interface
func (b *LBResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
return newLBResolver(cc, target.Endpoint, b.updateCh), nil
}
Original file line number Diff line number Diff line change
@@ -1,29 +1,36 @@
package service_discovery
package servicediscovery

import (
"context"
"fmt"
"math"
"time"

"github.com/projecteru2/core/auth"
"github.com/projecteru2/core/client/interceptor"
pb "github.com/projecteru2/core/rpc/gen"
"github.com/projecteru2/core/types"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
)

type eruServiceDiscovery struct {
endpoint string
// EruServiceDiscovery watches eru service status
type EruServiceDiscovery struct {
endpoint string
authConfig types.AuthConfig
}

func New(endpoint string) *eruServiceDiscovery {
return &eruServiceDiscovery{
endpoint: endpoint,
// New EruServiceDiscovery
func New(endpoint string, authConfig types.AuthConfig) *EruServiceDiscovery {
return &EruServiceDiscovery{
endpoint: endpoint,
authConfig: authConfig,
}
}

func (w *eruServiceDiscovery) Watch(ctx context.Context) (_ <-chan []string, err error) {
cc, err := w.dial(ctx, w.endpoint)
// Watch .
func (w *EruServiceDiscovery) Watch(ctx context.Context) (_ <-chan []string, err error) {
cc, err := w.dial(ctx, w.endpoint, w.authConfig)
if err != nil {
log.Errorf("[EruServiceWatch] dial failed: %v", err)
return
Expand All @@ -36,8 +43,9 @@ func (w *eruServiceDiscovery) Watch(ctx context.Context) (_ <-chan []string, err
watchCtx, cancelWatch := context.WithCancel(ctx)
stream, err := client.WatchServiceStatus(watchCtx, &pb.Empty{})
if err != nil {
log.Errorf("[EruServiceWatch] watch failed: %v", err)
return
log.Errorf("[EruServiceWatch] watch failed, try later: %v", err)
time.Sleep(10 * time.Second)
continue
}
expectedInterval := time.Duration(math.MaxInt64) / time.Second

Expand All @@ -52,7 +60,6 @@ func (w *eruServiceDiscovery) Watch(ctx context.Context) (_ <-chan []string, err
case <-cancelTimer:
return
}

}()
status, err := stream.Recv()
close(cancelTimer)
Expand All @@ -70,13 +77,17 @@ func (w *eruServiceDiscovery) Watch(ctx context.Context) (_ <-chan []string, err
return ch, nil
}

func (w *eruServiceDiscovery) dial(ctx context.Context, addr string) (*grpc.ClientConn, error) {
func (w *EruServiceDiscovery) dial(ctx context.Context, addr string, authConfig types.AuthConfig) (*grpc.ClientConn, error) {
opts := []grpc.DialOption{
grpc.WithInsecure(),
grpc.WithBalancerName("round_robin"),
grpc.WithBalancerName("round_robin"), // nolint:staticcheck
grpc.WithStreamInterceptor(interceptor.NewStreamRetry(interceptor.RetryOptions{Max: 1})),
}

if authConfig.Username != "" {
opts = append(opts, grpc.WithPerRPCCredentials(auth.NewCredential(authConfig)))
}

target := makeServiceDiscoveryTarget(addr)
return grpc.DialContext(ctx, target, opts...)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,32 +1,11 @@
package service_discovery
package servicediscovery

import (
log "github.com/sirupsen/logrus"

"google.golang.org/grpc/resolver"
)

type LBResolverBuilder struct {
updateCh chan []string
}

var lbResolverBuilder *LBResolverBuilder

func init() {
lbResolverBuilder = &LBResolverBuilder{
updateCh: make(chan []string),
}
resolver.Register(lbResolverBuilder)
}

func (b *LBResolverBuilder) Scheme() string {
return "lb"
}

func (b *LBResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
return newLBResolver(cc, target.Endpoint, b.updateCh), nil
}

type lbResolver struct {
cc resolver.ClientConn
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package service_discovery
package servicediscovery

import "context"

// ServiceDiscovery notifies current core service addresses
type ServiceDiscovery interface {
Watch(context.Context) (<-chan []string, error)
}
Loading

0 comments on commit e6b5d62

Please sign in to comment.