Skip to content

Commit

Permalink
client: resolver updates svc addresses periodically
Browse files Browse the repository at this point in the history
  • Loading branch information
jschwinger233 authored and CMGS committed Aug 14, 2020
1 parent 6eb0177 commit 6b3e03a
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 0 deletions.
19 changes: 19 additions & 0 deletions client/resolver/eru/builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package eru

import "google.golang.org/grpc/resolver"

type eruResolverBuilder struct{}

func init() {
resolver.Register(&eruResolverBuilder{})
}

// Scheme for interface
func (b *eruResolverBuilder) Scheme() string {
return "eru"
}

// Build for interface
func (b *eruResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
return New(cc, target.Endpoint), nil
}
65 changes: 65 additions & 0 deletions client/resolver/eru/resolver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package eru

import (
"context"

"github.com/projecteru2/core/client/service_discovery"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc/resolver"
)

type eruResolver struct {
cc resolver.ClientConn
cancel context.CancelFunc
discovery service_discovery.ServiceDiscovery
}

func New(cc resolver.ClientConn, endpoint string) *eruResolver {
r := &eruResolver{
cc: cc,
discovery: service_discovery.New(endpoint),
}
go r.sync()
return r
}

// ResolveNow for interface
func (r *eruResolver) ResolveNow(_ resolver.ResolveNowOptions) {}

// Close for interface
func (r *eruResolver) Close() {
r.cancel()
}

func (r *eruResolver) sync() {
log.Info("[eruResolver] start sync service discovery")
ctx, cancel := context.WithCancel(context.Background())
r.cancel = cancel
defer cancel()

ch, err := r.discovery.Watch(ctx)
if err != nil {
log.Errorf("[eruResolver] failed to watch service status: %v", err)
return
}
for {
select {
case <-ctx.Done():
log.Errorf("[eruResolver] watch interrupted: %v", ctx.Err())
break
case endpoints, ok := <-ch:
if !ok {
log.Error("[eruResolver] watch closed")
break
}

var addresses []resolver.Address
log.Debugf("[eruResolver] update state: %v", endpoints)
for _, ep := range endpoints {
addresses = append(addresses, resolver.Address{Addr: ep})
}
r.cc.UpdateState(resolver.State{Addresses: addresses})
}
}

}
19 changes: 19 additions & 0 deletions client/resolver/static/builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package static

import "google.golang.org/grpc/resolver"

type staticResolverBuilder struct{}

func init() {
resolver.Register(&staticResolverBuilder{})
}

// Scheme for interface
func (b *staticResolverBuilder) Scheme() string {
return "static"
}

// Build for interface
func (b *staticResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
return New(cc, target.Endpoint), nil
}
30 changes: 30 additions & 0 deletions client/resolver/static/resolver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package static

import (
"strings"

"google.golang.org/grpc/resolver"
)

type staticResolver struct {
addresses []resolver.Address
cc resolver.ClientConn
}

func New(cc resolver.ClientConn, endpoints string) *staticResolver {
var addresses []resolver.Address
for _, ep := range strings.Split(endpoints, ",") {
addresses = append(addresses, resolver.Address{Addr: ep})
}
cc.UpdateState(resolver.State{Addresses: addresses})
return &staticResolver{
cc: cc,
addresses: addresses,
}
}

// ResolveNow for interface
func (r *staticResolver) ResolveNow(_ resolver.ResolveNowOptions) {}

// Close for interface
func (r *staticResolver) Close() {}

0 comments on commit 6b3e03a

Please sign in to comment.