Skip to content

Commit

Permalink
cluster: register service can fast fail (1s)
Browse files Browse the repository at this point in the history
  • Loading branch information
jschwinger233 authored and CMGS committed Aug 14, 2020
1 parent 50cedea commit 81e2e4e
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 9 deletions.
21 changes: 14 additions & 7 deletions cluster/calcium/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,17 +101,22 @@ func (c *Calcium) RegisterService(ctx context.Context) (unregister func(), err e
log.Errorf("[RegisterService] failed to get outbound address: %v", err)
return
}
if err = c.store.RegisterService(ctx, serviceAddress, c.config.GRPCConfig.ServiceHeartbeatInterval); err != nil {

ctxRegister, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
if err = c.store.RegisterService(ctxRegister, serviceAddress, c.config.GRPCConfig.ServiceHeartbeatInterval); err != nil {
log.Errorf("[RegisterService] failed to register service: %v", err)
return
}

done := make(chan struct{})
ctx, cancel := context.WithCancel(ctx)
ctxHeartbeat, cancelHeartbeat := context.WithCancel(ctx)
go func() {
defer close(done)
defer func() {
if err := c.store.UnregisterService(context.Background(), serviceAddress); err != nil {
ctxUnregister, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
if err := c.store.UnregisterService(ctxUnregister, serviceAddress); err != nil {
log.Errorf("[RegisterService] failed to unregister service: %v", err)
}
}()
Expand All @@ -120,17 +125,19 @@ func (c *Calcium) RegisterService(ctx context.Context) (unregister func(), err e
for {
select {
case <-timer.C:
if err := c.store.RegisterService(ctx, serviceAddress, c.config.GRPCConfig.ServiceHeartbeatInterval); err != nil {
ctxRegister, cancel = context.WithTimeout(ctxHeartbeat, time.Second)
if err := c.store.RegisterService(ctxRegister, serviceAddress, c.config.GRPCConfig.ServiceHeartbeatInterval); err != nil {
log.Errorf("[RegisterService] failed to register service: %v", err)
}
case <-ctx.Done():
log.Infof("[RegisterService] context done: %v", ctx.Err())
cancel()
case <-ctxRegister.Done():
log.Infof("[RegisterService] context done: %v", ctxRegister.Err())
return
}
}
}()
return func() {
cancel()
cancelHeartbeat()
<-done
}, err
}
4 changes: 2 additions & 2 deletions cluster/calcium/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestServiceStatusStream(t *testing.T) {
c.store = store

registered := map[string]int{}
store.On("RegisterService", mock.AnythingOfType("*context.cancelCtx"), mock.AnythingOfType("string"), mock.AnythingOfType("time.Duration")).Return(
store.On("RegisterService", mock.AnythingOfType("*context.timerCtx"), mock.AnythingOfType("string"), mock.AnythingOfType("time.Duration")).Return(
func(_ context.Context, addr string, _ time.Duration) error {
if v, ok := registered[addr]; ok {
registered[addr] = v + 1
Expand All @@ -31,7 +31,7 @@ func TestServiceStatusStream(t *testing.T) {
return nil
},
)
store.On("UnregisterService", mock.AnythingOfType("*context.emptyCtx"), mock.AnythingOfType("string")).Return(
store.On("UnregisterService", mock.AnythingOfType("*context.timerCtx"), mock.AnythingOfType("string")).Return(
func(_ context.Context, addr string) error {
delete(registered, addr)
return nil
Expand Down

0 comments on commit 81e2e4e

Please sign in to comment.