Skip to content

Commit

Permalink
client: discovery watches service status change
Browse files Browse the repository at this point in the history
  • Loading branch information
jschwinger233 authored and CMGS committed Aug 14, 2020
1 parent 6b3e03a commit 8a1956c
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 0 deletions.
86 changes: 86 additions & 0 deletions client/service_discovery/eru_service_discovery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package service_discovery

import (
"context"
"fmt"
"math"
"time"

"github.com/projecteru2/core/client/interceptor"
pb "github.com/projecteru2/core/rpc/gen"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
)

type eruServiceDiscovery struct {
endpoint string
}

func New(endpoint string) *eruServiceDiscovery {
return &eruServiceDiscovery{
endpoint: endpoint,
}
}

func (w *eruServiceDiscovery) Watch(ctx context.Context) (_ <-chan []string, err error) {
cc, err := w.dial(ctx, w.endpoint)
if err != nil {
log.Errorf("[EruServiceWatch] dial failed: %v", err)
return
}
client := pb.NewCoreRPCClient(cc)
ch := make(chan []string)
go func() {
defer close(ch)
for {
watchCtx, cancelWatch := context.WithCancel(ctx)
stream, err := client.WatchServiceStatus(watchCtx, &pb.Empty{})
if err != nil {
log.Errorf("[EruServiceWatch] watch failed: %v", err)
return
}
expectedInterval := time.Duration(math.MaxInt64) / time.Second

for {
cancelTimer := make(chan struct{})
go func() {
timer := time.NewTimer(expectedInterval * time.Second)
defer timer.Stop()
select {
case <-timer.C:
cancelWatch()
case <-cancelTimer:
return
}

}()
status, err := stream.Recv()
close(cancelTimer)
if err != nil {
log.Errorf("[EruServiceWatch] recv failed: %v", err)
break
}
expectedInterval = time.Duration(status.GetIntervalInSecond())
lbResolverBuilder.updateCh <- status.GetAddresses()
ch <- status.GetAddresses()
}
}
}()

return ch, nil
}

func (w *eruServiceDiscovery) dial(ctx context.Context, addr string) (*grpc.ClientConn, error) {
opts := []grpc.DialOption{
grpc.WithInsecure(),
grpc.WithBalancerName("round_robin"),
grpc.WithStreamInterceptor(interceptor.NewStreamRetry(interceptor.RetryOptions{Max: 1})),
}

target := makeServiceDiscoveryTarget(addr)
return grpc.DialContext(ctx, target, opts...)
}

func makeServiceDiscoveryTarget(addr string) string {
return fmt.Sprintf("lb://_/%s", addr)
}
56 changes: 56 additions & 0 deletions client/service_discovery/resolver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package service_discovery

import (
log "github.com/sirupsen/logrus"

"google.golang.org/grpc/resolver"
)

type LBResolverBuilder struct {
updateCh chan []string
}

var lbResolverBuilder *LBResolverBuilder

func init() {
lbResolverBuilder = &LBResolverBuilder{
updateCh: make(chan []string),
}
resolver.Register(lbResolverBuilder)
}

func (b *LBResolverBuilder) Scheme() string {
return "lb"
}

func (b *LBResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
return newLBResolver(cc, target.Endpoint, b.updateCh), nil
}

type lbResolver struct {
cc resolver.ClientConn
}

func newLBResolver(cc resolver.ClientConn, endpoint string, updateCh <-chan []string) *lbResolver {
r := &lbResolver{cc: cc}
r.updateAddresses(endpoint)
go func() {
for {
r.updateAddresses(<-updateCh...)
}
}()
return r
}

func (r *lbResolver) updateAddresses(endpoints ...string) {
log.Debugf("[lbResolver] update state: %v", endpoints)
addresses := []resolver.Address{}
for _, ep := range endpoints {
addresses = append(addresses, resolver.Address{Addr: ep})
}
r.cc.UpdateState(resolver.State{Addresses: addresses})
}

func (r *lbResolver) ResolveNow(_ resolver.ResolveNowOptions) {}

func (r lbResolver) Close() {}
7 changes: 7 additions & 0 deletions client/service_discovery/service_discovery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package service_discovery

import "context"

type ServiceDiscovery interface {
Watch(context.Context) (<-chan []string, error)
}

0 comments on commit 8a1956c

Please sign in to comment.