diff --git a/client/client.go b/client/client.go index 979715b07..d24f9ad34 100644 --- a/client/client.go +++ b/client/client.go @@ -4,12 +4,10 @@ import ( "context" "time" - log "github.com/sirupsen/logrus" - "github.com/projecteru2/core/auth" "github.com/projecteru2/core/client/interceptor" - _ "github.com/projecteru2/core/client/resolver/eru" - _ "github.com/projecteru2/core/client/resolver/static" + _ "github.com/projecteru2/core/client/resolver/eru" // register grpc resolver: eru:// + _ "github.com/projecteru2/core/client/resolver/static" // register grpc resolver: static:// pb "github.com/projecteru2/core/rpc/gen" "github.com/projecteru2/core/types" "github.com/projecteru2/core/utils" @@ -24,12 +22,12 @@ type Client struct { } // NewClient new a client -func NewClient(ctx context.Context, addr string, authConfig types.AuthConfig) *Client { - client := &Client{ +func NewClient(ctx context.Context, addr string, authConfig types.AuthConfig) (*Client, error) { + cc, err := dial(ctx, addr, authConfig) + return &Client{ addr: addr, - conn: dial(ctx, addr, authConfig), - } - return client + conn: cc, + }, err } // GetConn return connection @@ -42,11 +40,11 @@ func (c *Client) GetRPCClient() pb.CoreRPCClient { return pb.NewCoreRPCClient(c.conn) } -func dial(ctx context.Context, addr string, authConfig types.AuthConfig) *grpc.ClientConn { +func dial(ctx context.Context, addr string, authConfig types.AuthConfig) (*grpc.ClientConn, error) { opts := []grpc.DialOption{ grpc.WithInsecure(), grpc.WithKeepaliveParams(keepalive.ClientParameters{Time: 6 * 60 * time.Second, Timeout: time.Second}), - grpc.WithBalancerName("round_robin"), + grpc.WithBalancerName("round_robin"), // nolint:staticcheck grpc.WithUnaryInterceptor(interceptor.NewUnaryRetry(interceptor.RetryOptions{Max: 1})), grpc.WithStreamInterceptor(interceptor.NewStreamRetry(interceptor.RetryOptions{Max: 1})), } @@ -54,10 +52,6 @@ func dial(ctx context.Context, addr string, authConfig types.AuthConfig) *grpc.C opts = append(opts, grpc.WithPerRPCCredentials(auth.NewCredential(authConfig))) } - target := utils.MakeTarget(addr) - cc, err := grpc.DialContext(ctx, target, opts...) - if err != nil { - log.Panicf("[NewClient] failed to dial grpc %s: %v", addr, err) - } - return cc + target := utils.MakeTarget(addr, authConfig) + return grpc.DialContext(ctx, target, opts...) } diff --git a/client/interceptor/retry.go b/client/interceptor/retry.go index 640087b1e..7a17c491e 100644 --- a/client/interceptor/retry.go +++ b/client/interceptor/retry.go @@ -10,6 +10,7 @@ import ( "google.golang.org/grpc" ) +// NewUnaryRetry makes unary RPC retry on error func NewUnaryRetry(retryOpts RetryOptions) grpc.UnaryClientInterceptor { return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { return backoff.Retry(func() error { @@ -18,11 +19,13 @@ func NewUnaryRetry(retryOpts RetryOptions) grpc.UnaryClientInterceptor { } } +// RPCNeedRetry records rpc stream methods to retry var RPCNeedRetry = map[string]struct{}{ - "/pb.CoreRPC/ContainerStatusStream": struct{}{}, - "/pb.CoreRPC/WatchServiceStatus": struct{}{}, + "/pb.CoreRPC/ContainerStatusStream": {}, + "/pb.CoreRPC/WatchServiceStatus": {}, } +// NewStreamRetry make specific stream retry on error func NewStreamRetry(retryOpts RetryOptions) grpc.StreamClientInterceptor { return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { stream, err := streamer(ctx, desc, cc, method, opts...) diff --git a/client/interceptor/types.go b/client/interceptor/types.go index f62d1bc0f..728b28758 100644 --- a/client/interceptor/types.go +++ b/client/interceptor/types.go @@ -7,6 +7,7 @@ import ( "google.golang.org/grpc" ) +// RetryOptions . type RetryOptions struct { Max int } diff --git a/client/resolver/eru/builder.go b/client/resolver/eru/builder.go index 1613f6814..3bb2082c2 100644 --- a/client/resolver/eru/builder.go +++ b/client/resolver/eru/builder.go @@ -4,7 +4,7 @@ import "google.golang.org/grpc/resolver" type eruResolverBuilder struct{} -func init() { +func init() { // nolint resolver.Register(&eruResolverBuilder{}) } @@ -15,5 +15,5 @@ func (b *eruResolverBuilder) Scheme() string { // Build for interface func (b *eruResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { - return New(cc, target.Endpoint), nil + return New(cc, target.Endpoint, target.Authority), nil } diff --git a/client/resolver/eru/resolver.go b/client/resolver/eru/resolver.go index a3a967a2d..903310dd1 100644 --- a/client/resolver/eru/resolver.go +++ b/client/resolver/eru/resolver.go @@ -2,59 +2,69 @@ package eru import ( "context" + "strings" - "github.com/projecteru2/core/client/service_discovery" + "github.com/projecteru2/core/client/servicediscovery" + "github.com/projecteru2/core/types" log "github.com/sirupsen/logrus" "google.golang.org/grpc/resolver" ) -type eruResolver struct { +// Resolver for target eru://{addr} +type Resolver struct { cc resolver.ClientConn cancel context.CancelFunc - discovery service_discovery.ServiceDiscovery + discovery servicediscovery.ServiceDiscovery } -func New(cc resolver.ClientConn, endpoint string) *eruResolver { - r := &eruResolver{ +// New Resolver +func New(cc resolver.ClientConn, endpoint string, authority string) *Resolver { + var username, password string + if authority != "" { + parts := strings.Split(authority, ":") + username, password = strings.TrimLeft(parts[0], "@"), parts[1] + } + authConfig := types.AuthConfig{Username: username, Password: password} + r := &Resolver{ cc: cc, - discovery: service_discovery.New(endpoint), + discovery: servicediscovery.New(endpoint, authConfig), } go r.sync() return r } // ResolveNow for interface -func (r *eruResolver) ResolveNow(_ resolver.ResolveNowOptions) {} +func (r *Resolver) ResolveNow(_ resolver.ResolveNowOptions) {} // Close for interface -func (r *eruResolver) Close() { +func (r *Resolver) Close() { r.cancel() } -func (r *eruResolver) sync() { - log.Info("[eruResolver] start sync service discovery") +func (r *Resolver) sync() { + log.Info("[EruResolver] start sync service discovery") ctx, cancel := context.WithCancel(context.Background()) r.cancel = cancel defer cancel() ch, err := r.discovery.Watch(ctx) if err != nil { - log.Errorf("[eruResolver] failed to watch service status: %v", err) + log.Errorf("[EruResolver] failed to watch service status: %v", err) return } for { select { case <-ctx.Done(): - log.Errorf("[eruResolver] watch interrupted: %v", ctx.Err()) - break + log.Errorf("[EruResolver] watch interrupted: %v", ctx.Err()) + return case endpoints, ok := <-ch: if !ok { - log.Error("[eruResolver] watch closed") - break + log.Error("[EruResolver] watch closed") + return } var addresses []resolver.Address - log.Debugf("[eruResolver] update state: %v", endpoints) + log.Debugf("[EruResolver] update state: %v", endpoints) for _, ep := range endpoints { addresses = append(addresses, resolver.Address{Addr: ep}) } diff --git a/client/resolver/static/builder.go b/client/resolver/static/builder.go index bb53d70b3..eb0bb84a2 100644 --- a/client/resolver/static/builder.go +++ b/client/resolver/static/builder.go @@ -4,7 +4,7 @@ import "google.golang.org/grpc/resolver" type staticResolverBuilder struct{} -func init() { +func init() { // nolint resolver.Register(&staticResolverBuilder{}) } diff --git a/client/resolver/static/resolver.go b/client/resolver/static/resolver.go index 01fe59ccf..f127b16a4 100644 --- a/client/resolver/static/resolver.go +++ b/client/resolver/static/resolver.go @@ -6,25 +6,27 @@ import ( "google.golang.org/grpc/resolver" ) -type staticResolver struct { +// Resolver for target static://{addr1},{addr2},{addr3} +type Resolver struct { addresses []resolver.Address cc resolver.ClientConn } -func New(cc resolver.ClientConn, endpoints string) *staticResolver { +// New Resolver +func New(cc resolver.ClientConn, endpoints string) *Resolver { var addresses []resolver.Address for _, ep := range strings.Split(endpoints, ",") { addresses = append(addresses, resolver.Address{Addr: ep}) } cc.UpdateState(resolver.State{Addresses: addresses}) - return &staticResolver{ + return &Resolver{ cc: cc, addresses: addresses, } } // ResolveNow for interface -func (r *staticResolver) ResolveNow(_ resolver.ResolveNowOptions) {} +func (r *Resolver) ResolveNow(_ resolver.ResolveNowOptions) {} // Close for interface -func (r *staticResolver) Close() {} +func (r *Resolver) Close() {} diff --git a/client/servicediscovery/builder.go b/client/servicediscovery/builder.go new file mode 100644 index 000000000..e2a29cf19 --- /dev/null +++ b/client/servicediscovery/builder.go @@ -0,0 +1,27 @@ +package servicediscovery + +import "google.golang.org/grpc/resolver" + +// LBResolverBuilder for service discovery lb +type LBResolverBuilder struct { + updateCh chan []string +} + +var lbResolverBuilder *LBResolverBuilder + +func init() { // nolint + lbResolverBuilder = &LBResolverBuilder{ + updateCh: make(chan []string), + } + resolver.Register(lbResolverBuilder) +} + +// Scheme for interface +func (b *LBResolverBuilder) Scheme() string { + return "lb" +} + +// Build for interface +func (b *LBResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { + return newLBResolver(cc, target.Endpoint, b.updateCh), nil +} diff --git a/client/service_discovery/eru_service_discovery.go b/client/servicediscovery/eru_service_discovery.go similarity index 61% rename from client/service_discovery/eru_service_discovery.go rename to client/servicediscovery/eru_service_discovery.go index 0e7b5ebd7..8204dd373 100644 --- a/client/service_discovery/eru_service_discovery.go +++ b/client/servicediscovery/eru_service_discovery.go @@ -1,4 +1,4 @@ -package service_discovery +package servicediscovery import ( "context" @@ -6,24 +6,31 @@ import ( "math" "time" + "github.com/projecteru2/core/auth" "github.com/projecteru2/core/client/interceptor" pb "github.com/projecteru2/core/rpc/gen" + "github.com/projecteru2/core/types" log "github.com/sirupsen/logrus" "google.golang.org/grpc" ) -type eruServiceDiscovery struct { - endpoint string +// EruServiceDiscovery watches eru service status +type EruServiceDiscovery struct { + endpoint string + authConfig types.AuthConfig } -func New(endpoint string) *eruServiceDiscovery { - return &eruServiceDiscovery{ - endpoint: endpoint, +// New EruServiceDiscovery +func New(endpoint string, authConfig types.AuthConfig) *EruServiceDiscovery { + return &EruServiceDiscovery{ + endpoint: endpoint, + authConfig: authConfig, } } -func (w *eruServiceDiscovery) Watch(ctx context.Context) (_ <-chan []string, err error) { - cc, err := w.dial(ctx, w.endpoint) +// Watch . +func (w *EruServiceDiscovery) Watch(ctx context.Context) (_ <-chan []string, err error) { + cc, err := w.dial(ctx, w.endpoint, w.authConfig) if err != nil { log.Errorf("[EruServiceWatch] dial failed: %v", err) return @@ -36,8 +43,9 @@ func (w *eruServiceDiscovery) Watch(ctx context.Context) (_ <-chan []string, err watchCtx, cancelWatch := context.WithCancel(ctx) stream, err := client.WatchServiceStatus(watchCtx, &pb.Empty{}) if err != nil { - log.Errorf("[EruServiceWatch] watch failed: %v", err) - return + log.Errorf("[EruServiceWatch] watch failed, try later: %v", err) + time.Sleep(10 * time.Second) + continue } expectedInterval := time.Duration(math.MaxInt64) / time.Second @@ -52,7 +60,6 @@ func (w *eruServiceDiscovery) Watch(ctx context.Context) (_ <-chan []string, err case <-cancelTimer: return } - }() status, err := stream.Recv() close(cancelTimer) @@ -70,13 +77,17 @@ func (w *eruServiceDiscovery) Watch(ctx context.Context) (_ <-chan []string, err return ch, nil } -func (w *eruServiceDiscovery) dial(ctx context.Context, addr string) (*grpc.ClientConn, error) { +func (w *EruServiceDiscovery) dial(ctx context.Context, addr string, authConfig types.AuthConfig) (*grpc.ClientConn, error) { opts := []grpc.DialOption{ grpc.WithInsecure(), - grpc.WithBalancerName("round_robin"), + grpc.WithBalancerName("round_robin"), // nolint:staticcheck grpc.WithStreamInterceptor(interceptor.NewStreamRetry(interceptor.RetryOptions{Max: 1})), } + if authConfig.Username != "" { + opts = append(opts, grpc.WithPerRPCCredentials(auth.NewCredential(authConfig))) + } + target := makeServiceDiscoveryTarget(addr) return grpc.DialContext(ctx, target, opts...) } diff --git a/client/service_discovery/resolver.go b/client/servicediscovery/resolver.go similarity index 59% rename from client/service_discovery/resolver.go rename to client/servicediscovery/resolver.go index aa8b09197..82ecc0735 100644 --- a/client/service_discovery/resolver.go +++ b/client/servicediscovery/resolver.go @@ -1,4 +1,4 @@ -package service_discovery +package servicediscovery import ( log "github.com/sirupsen/logrus" @@ -6,27 +6,6 @@ import ( "google.golang.org/grpc/resolver" ) -type LBResolverBuilder struct { - updateCh chan []string -} - -var lbResolverBuilder *LBResolverBuilder - -func init() { - lbResolverBuilder = &LBResolverBuilder{ - updateCh: make(chan []string), - } - resolver.Register(lbResolverBuilder) -} - -func (b *LBResolverBuilder) Scheme() string { - return "lb" -} - -func (b *LBResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { - return newLBResolver(cc, target.Endpoint, b.updateCh), nil -} - type lbResolver struct { cc resolver.ClientConn } diff --git a/client/service_discovery/service_discovery.go b/client/servicediscovery/service_discovery.go similarity index 55% rename from client/service_discovery/service_discovery.go rename to client/servicediscovery/service_discovery.go index cab75a01a..ba1af8cd2 100644 --- a/client/service_discovery/service_discovery.go +++ b/client/servicediscovery/service_discovery.go @@ -1,7 +1,8 @@ -package service_discovery +package servicediscovery import "context" +// ServiceDiscovery notifies current core service addresses type ServiceDiscovery interface { Watch(context.Context) (<-chan []string, error) } diff --git a/utils/resolver.go b/utils/resolver.go index 99db8e1f0..44b50f06b 100644 --- a/utils/resolver.go +++ b/utils/resolver.go @@ -3,14 +3,17 @@ package utils import ( "fmt" "strings" + + "github.com/projecteru2/core/types" ) -var legitSchemes map[string]string = map[string]string{ +var legitSchemes = map[string]string{ "eru": "eru://", "static": "static://", } -func MakeTarget(addr string) string { +// MakeTarget return target: {scheme}://@{user}:{password}/{endpoint} +func MakeTarget(addr string, authConfig types.AuthConfig) string { scheme := "eru" for s, prefix := range legitSchemes { if strings.HasPrefix(addr, prefix) { @@ -20,5 +23,5 @@ func MakeTarget(addr string) string { } } - return fmt.Sprintf("%s://_/%s", scheme, addr) + return fmt.Sprintf("%s://@%s:%s/%s", scheme, authConfig.Username, authConfig.Password, addr) }