From 8a1956c30f5a5b5f659fcde1910176ad85c75e14 Mon Sep 17 00:00:00 2001 From: zc Date: Thu, 13 Aug 2020 15:07:48 +0800 Subject: [PATCH] client: discovery watches service status change --- .../eru_service_discovery.go | 86 +++++++++++++++++++ client/service_discovery/resolver.go | 56 ++++++++++++ client/service_discovery/service_discovery.go | 7 ++ 3 files changed, 149 insertions(+) create mode 100644 client/service_discovery/eru_service_discovery.go create mode 100644 client/service_discovery/resolver.go create mode 100644 client/service_discovery/service_discovery.go diff --git a/client/service_discovery/eru_service_discovery.go b/client/service_discovery/eru_service_discovery.go new file mode 100644 index 000000000..0e7b5ebd7 --- /dev/null +++ b/client/service_discovery/eru_service_discovery.go @@ -0,0 +1,86 @@ +package service_discovery + +import ( + "context" + "fmt" + "math" + "time" + + "github.com/projecteru2/core/client/interceptor" + pb "github.com/projecteru2/core/rpc/gen" + log "github.com/sirupsen/logrus" + "google.golang.org/grpc" +) + +type eruServiceDiscovery struct { + endpoint string +} + +func New(endpoint string) *eruServiceDiscovery { + return &eruServiceDiscovery{ + endpoint: endpoint, + } +} + +func (w *eruServiceDiscovery) Watch(ctx context.Context) (_ <-chan []string, err error) { + cc, err := w.dial(ctx, w.endpoint) + if err != nil { + log.Errorf("[EruServiceWatch] dial failed: %v", err) + return + } + client := pb.NewCoreRPCClient(cc) + ch := make(chan []string) + go func() { + defer close(ch) + for { + watchCtx, cancelWatch := context.WithCancel(ctx) + stream, err := client.WatchServiceStatus(watchCtx, &pb.Empty{}) + if err != nil { + log.Errorf("[EruServiceWatch] watch failed: %v", err) + return + } + expectedInterval := time.Duration(math.MaxInt64) / time.Second + + for { + cancelTimer := make(chan struct{}) + go func() { + timer := time.NewTimer(expectedInterval * time.Second) + defer timer.Stop() + select { + case <-timer.C: + cancelWatch() + case <-cancelTimer: + return + } + + }() + status, err := stream.Recv() + close(cancelTimer) + if err != nil { + log.Errorf("[EruServiceWatch] recv failed: %v", err) + break + } + expectedInterval = time.Duration(status.GetIntervalInSecond()) + lbResolverBuilder.updateCh <- status.GetAddresses() + ch <- status.GetAddresses() + } + } + }() + + return ch, nil +} + +func (w *eruServiceDiscovery) dial(ctx context.Context, addr string) (*grpc.ClientConn, error) { + opts := []grpc.DialOption{ + grpc.WithInsecure(), + grpc.WithBalancerName("round_robin"), + grpc.WithStreamInterceptor(interceptor.NewStreamRetry(interceptor.RetryOptions{Max: 1})), + } + + target := makeServiceDiscoveryTarget(addr) + return grpc.DialContext(ctx, target, opts...) +} + +func makeServiceDiscoveryTarget(addr string) string { + return fmt.Sprintf("lb://_/%s", addr) +} diff --git a/client/service_discovery/resolver.go b/client/service_discovery/resolver.go new file mode 100644 index 000000000..aa8b09197 --- /dev/null +++ b/client/service_discovery/resolver.go @@ -0,0 +1,56 @@ +package service_discovery + +import ( + log "github.com/sirupsen/logrus" + + "google.golang.org/grpc/resolver" +) + +type LBResolverBuilder struct { + updateCh chan []string +} + +var lbResolverBuilder *LBResolverBuilder + +func init() { + lbResolverBuilder = &LBResolverBuilder{ + updateCh: make(chan []string), + } + resolver.Register(lbResolverBuilder) +} + +func (b *LBResolverBuilder) Scheme() string { + return "lb" +} + +func (b *LBResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { + return newLBResolver(cc, target.Endpoint, b.updateCh), nil +} + +type lbResolver struct { + cc resolver.ClientConn +} + +func newLBResolver(cc resolver.ClientConn, endpoint string, updateCh <-chan []string) *lbResolver { + r := &lbResolver{cc: cc} + r.updateAddresses(endpoint) + go func() { + for { + r.updateAddresses(<-updateCh...) + } + }() + return r +} + +func (r *lbResolver) updateAddresses(endpoints ...string) { + log.Debugf("[lbResolver] update state: %v", endpoints) + addresses := []resolver.Address{} + for _, ep := range endpoints { + addresses = append(addresses, resolver.Address{Addr: ep}) + } + r.cc.UpdateState(resolver.State{Addresses: addresses}) +} + +func (r *lbResolver) ResolveNow(_ resolver.ResolveNowOptions) {} + +func (r lbResolver) Close() {} diff --git a/client/service_discovery/service_discovery.go b/client/service_discovery/service_discovery.go new file mode 100644 index 000000000..cab75a01a --- /dev/null +++ b/client/service_discovery/service_discovery.go @@ -0,0 +1,7 @@ +package service_discovery + +import "context" + +type ServiceDiscovery interface { + Watch(context.Context) (<-chan []string, error) +}