From 6b3e03a8713169633d5c1ce5ce06a22a2911014a Mon Sep 17 00:00:00 2001 From: zc Date: Thu, 13 Aug 2020 15:06:50 +0800 Subject: [PATCH] client: resolver updates svc addresses periodically --- client/resolver/eru/builder.go | 19 +++++++++ client/resolver/eru/resolver.go | 65 ++++++++++++++++++++++++++++++ client/resolver/static/builder.go | 19 +++++++++ client/resolver/static/resolver.go | 30 ++++++++++++++ 4 files changed, 133 insertions(+) create mode 100644 client/resolver/eru/builder.go create mode 100644 client/resolver/eru/resolver.go create mode 100644 client/resolver/static/builder.go create mode 100644 client/resolver/static/resolver.go diff --git a/client/resolver/eru/builder.go b/client/resolver/eru/builder.go new file mode 100644 index 000000000..1613f6814 --- /dev/null +++ b/client/resolver/eru/builder.go @@ -0,0 +1,19 @@ +package eru + +import "google.golang.org/grpc/resolver" + +type eruResolverBuilder struct{} + +func init() { + resolver.Register(&eruResolverBuilder{}) +} + +// Scheme for interface +func (b *eruResolverBuilder) Scheme() string { + return "eru" +} + +// Build for interface +func (b *eruResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { + return New(cc, target.Endpoint), nil +} diff --git a/client/resolver/eru/resolver.go b/client/resolver/eru/resolver.go new file mode 100644 index 000000000..a3a967a2d --- /dev/null +++ b/client/resolver/eru/resolver.go @@ -0,0 +1,65 @@ +package eru + +import ( + "context" + + "github.com/projecteru2/core/client/service_discovery" + log "github.com/sirupsen/logrus" + "google.golang.org/grpc/resolver" +) + +type eruResolver struct { + cc resolver.ClientConn + cancel context.CancelFunc + discovery service_discovery.ServiceDiscovery +} + +func New(cc resolver.ClientConn, endpoint string) *eruResolver { + r := &eruResolver{ + cc: cc, + discovery: service_discovery.New(endpoint), + } + go r.sync() + return r +} + +// ResolveNow for interface +func (r *eruResolver) ResolveNow(_ resolver.ResolveNowOptions) {} + +// Close for interface +func (r *eruResolver) Close() { + r.cancel() +} + +func (r *eruResolver) sync() { + log.Info("[eruResolver] start sync service discovery") + ctx, cancel := context.WithCancel(context.Background()) + r.cancel = cancel + defer cancel() + + ch, err := r.discovery.Watch(ctx) + if err != nil { + log.Errorf("[eruResolver] failed to watch service status: %v", err) + return + } + for { + select { + case <-ctx.Done(): + log.Errorf("[eruResolver] watch interrupted: %v", ctx.Err()) + break + case endpoints, ok := <-ch: + if !ok { + log.Error("[eruResolver] watch closed") + break + } + + var addresses []resolver.Address + log.Debugf("[eruResolver] update state: %v", endpoints) + for _, ep := range endpoints { + addresses = append(addresses, resolver.Address{Addr: ep}) + } + r.cc.UpdateState(resolver.State{Addresses: addresses}) + } + } + +} diff --git a/client/resolver/static/builder.go b/client/resolver/static/builder.go new file mode 100644 index 000000000..bb53d70b3 --- /dev/null +++ b/client/resolver/static/builder.go @@ -0,0 +1,19 @@ +package static + +import "google.golang.org/grpc/resolver" + +type staticResolverBuilder struct{} + +func init() { + resolver.Register(&staticResolverBuilder{}) +} + +// Scheme for interface +func (b *staticResolverBuilder) Scheme() string { + return "static" +} + +// Build for interface +func (b *staticResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { + return New(cc, target.Endpoint), nil +} diff --git a/client/resolver/static/resolver.go b/client/resolver/static/resolver.go new file mode 100644 index 000000000..01fe59ccf --- /dev/null +++ b/client/resolver/static/resolver.go @@ -0,0 +1,30 @@ +package static + +import ( + "strings" + + "google.golang.org/grpc/resolver" +) + +type staticResolver struct { + addresses []resolver.Address + cc resolver.ClientConn +} + +func New(cc resolver.ClientConn, endpoints string) *staticResolver { + var addresses []resolver.Address + for _, ep := range strings.Split(endpoints, ",") { + addresses = append(addresses, resolver.Address{Addr: ep}) + } + cc.UpdateState(resolver.State{Addresses: addresses}) + return &staticResolver{ + cc: cc, + addresses: addresses, + } +} + +// ResolveNow for interface +func (r *staticResolver) ResolveNow(_ resolver.ResolveNowOptions) {} + +// Close for interface +func (r *staticResolver) Close() {}