Skip to content

Commit

Permalink
Fixed node controller assignemnts update after pods restart (#368)
Browse files Browse the repository at this point in the history
If a storage node restarts and its IP stop responding, the coordinator
health check will detect it in a ~2seconds period, though the component
that sends the cluster assignements updates will not detect the issue.

One of the reason is that it's using a grpc stream, so there is no
timeout on the send operation (because it would apply to the lifetime of
the stream itself, which instead needs to be long-lived).

The storage servers need to receive the new cluster assignments after
restart, otherwise they cannot pass it back to their clients.

To fix the issue, we close the assignments stream whenever the
health-check failure is triggered. This will ensure that coordinator
will retry after the restart.
  • Loading branch information
merlimat authored Jul 22, 2023
1 parent 59d2cc8 commit ea7a70f
Showing 1 changed file with 43 additions and 23 deletions.
66 changes: 43 additions & 23 deletions coordinator/impl/node_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"oxia/coordinator/model"
"oxia/proto"
"sync"
"sync/atomic"
"time"
)

Expand Down Expand Up @@ -60,11 +59,13 @@ type nodeController struct {
nodeAvailabilityListener NodeAvailabilityListener
rpc RpcProvider
log zerolog.Logger
closed atomic.Bool
ctx context.Context
cancel context.CancelFunc
initialRetryBackoff time.Duration

sendAssignmentsCtx context.Context
sendAssignmentsCancel context.CancelFunc

nodeIsRunningGauge metrics.Gauge
failedHealthChecks metrics.Counter
}
Expand Down Expand Up @@ -142,6 +143,10 @@ func (n *nodeController) healthCheckWithRetries() {
n.failedHealthChecks.Inc()
n.nodeAvailabilityListener.NodeBecameUnavailable(n.addr)
}

// To avoid the send assignments stream to miss the notification about the current
// node went down, we interrupt the current stream when the ping on the node fails
n.sendAssignmentsCancel()
})
}

Expand Down Expand Up @@ -231,41 +236,56 @@ func (n *nodeController) sendAssignmentsUpdatesWithRetries() {
}

func (n *nodeController) sendAssignmentsUpdates(backoff backoff.BackOff) error {
stream, err := n.rpc.PushShardAssignments(n.ctx, n.addr)
n.Lock()
n.sendAssignmentsCtx, n.sendAssignmentsCancel = context.WithCancel(n.ctx)
n.Unlock()
defer n.sendAssignmentsCancel()

stream, err := n.rpc.PushShardAssignments(n.sendAssignmentsCtx, n.addr)
if err != nil {
return err
}

var assignments *proto.ShardAssignments
for !n.closed.Load() {

assignments, err = n.shardAssignmentsProvider.WaitForNextUpdate(stream.Context(), assignments)
if err != nil {
return err
}
for {
select {
case <-n.ctx.Done():
return nil

default:
n.log.Debug().
Interface("current-assignments", assignments).
Msg("Waiting for next assignments update")
assignments, err = n.shardAssignmentsProvider.WaitForNextUpdate(stream.Context(), assignments)
if err != nil {
return err
}

if assignments == nil {
continue
}
if assignments == nil {
n.log.Debug().
Msg("Assignments are nil")
continue
}

n.log.Debug().
Interface("assignments", assignments).
Msg("Sending assignments")
n.log.Debug().
Interface("assignments", assignments).
Msg("Sending assignments")

if err := stream.Send(assignments); err != nil {
n.log.Debug().Err(err).
Msg("Failed to send assignments")
return err
}
if err := stream.Send(assignments); err != nil {
n.log.Debug().Err(err).
Msg("Failed to send assignments")
return err
}

backoff.Reset()
n.log.Debug().
Msg("Send assignments completed successfully")
backoff.Reset()
}
}

return nil
}

func (n *nodeController) Close() error {
n.closed.Store(true)
n.nodeIsRunningGauge.Unregister()
n.cancel()
return nil
Expand Down

0 comments on commit ea7a70f

Please sign in to comment.