Skip to content

Commit

Permalink
Add join process before starting routing table #247
Browse files Browse the repository at this point in the history
The code has been updated to include a join process before starting the routing table in internal/cluster/routingtable/ and internal/testcluster/testcluster.go. This ensures that the command handlers of the routing table service wait for the cluster join event. Additionally, any join process failure in RoutingTable is now handled properly. The changes also include the introduction of 'ErrNotJoinedYet' error in the discovery.go file to handle 'not joined yet' situations.
  • Loading branch information
buraksezer committed May 10, 2024
1 parent e773606 commit 82e4433
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 10 deletions.
3 changes: 3 additions & 0 deletions internal/cluster/balancer/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ func (mc *mockCluster) addNode(e *environment.Environment) *Balancer {
require.NoError(mc.t, err)
}

err = b.rt.Join()
require.NoError(mc.t, err)

err = b.rt.Start()
if err != nil {
require.NoError(mc.t, err)
Expand Down
5 changes: 3 additions & 2 deletions internal/cluster/routingtable/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import (
)

var (
ErrServerGone = errors.New("server is gone")
ErrClusterJoin = errors.New("cannot join the cluster")
ErrServerGone = errors.New("server is gone")
ErrNotJoinedYet = errors.New("not joined yet")
ErrClusterJoin = errors.New("cannot join the cluster")
// ErrOperationTimeout is returned when an operation times out.
ErrOperationTimeout = errors.New("operation timeout")
)
Expand Down
6 changes: 6 additions & 0 deletions internal/cluster/routingtable/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import (
)

func (r *RoutingTable) lengthOfPartCommandHandler(conn redcon.Conn, cmd redcon.Command) {
// The command handlers of the routing table service should wait for the cluster join event.
<-r.joined

lengthOfPartCmd, err := protocol.ParseLengthOfPartCommand(cmd)
if err != nil {
protocol.WriteError(conn, err)
Expand Down Expand Up @@ -61,6 +64,9 @@ func (r *RoutingTable) verifyRoutingTable(id uint64, table map[uint64]*route) er
}

func (r *RoutingTable) updateRoutingCommandHandler(conn redcon.Conn, cmd redcon.Command) {
// The command handlers of the routing table service should wait for the cluster join event.
<-r.joined

r.updateRoutingMtx.Lock()
defer r.updateRoutingMtx.Unlock()

Expand Down
31 changes: 23 additions & 8 deletions internal/cluster/routingtable/routingtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,11 @@ type RoutingTable struct {
callbacks []func()
callbackMtx sync.Mutex
pushPeriod time.Duration
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
// The command handlers of the routing table service should wait for the cluster join event.
joined chan struct{}
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}

func registerErrors() {
Expand Down Expand Up @@ -108,6 +110,7 @@ func New(e *environment.Environment) *RoutingTable {
client: e.Get("client").(*server.Client),
server: e.Get("server").(*server.Server),
pushPeriod: c.RoutingTablePushInterval,
joined: make(chan struct{}),
ctx: ctx,
cancel: cancel,
}
Expand Down Expand Up @@ -343,7 +346,7 @@ func (r *RoutingTable) pushPeriodically() {
}
}

func (r *RoutingTable) Start() error {
func (r *RoutingTable) Join() error {
err := r.discovery.Start()
if err != nil {
return err
Expand All @@ -361,13 +364,25 @@ func (r *RoutingTable) Start() error {
this, err := r.discovery.FindMemberByName(r.config.MemberlistConfig.Name)
if err != nil {
r.log.V(2).Printf("[ERROR] Failed to get this node in cluster: %v", err)
serr := r.discovery.Shutdown()
if serr != nil {
return serr
shutdownError := r.discovery.Shutdown()
if shutdownError != nil {
return shutdownError
}
return err
}
r.this = this
close(r.joined)
return nil
}

func (r *RoutingTable) Start() error {
select {
case <-r.joined:
// It's time to start the routing table service. Otherwise, this method will return an error.
default:
// Not yet, or the join process has failed
return ErrNotJoinedYet
}

// Store the current number of members in the member list.
// We need this to implement a simple split-brain protection algorithm.
Expand All @@ -379,7 +394,7 @@ func (r *RoutingTable) Start() error {
// 1 Hour
ctx, cancel := context.WithTimeout(r.ctx, time.Hour)
defer cancel()
err = r.tryWithInterval(ctx, time.Second, func() error {
err := r.tryWithInterval(ctx, time.Second, func() error {
// Check member count quorum now. If there is no enough peers to work, wait forever.
err := r.CheckMemberCountQuorum()
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions internal/cluster/routingtable/routingtable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ func (t *testCluster) addNode(c *config.Config) (*RoutingTable, error) {

srv := testutil.NewServer(c)
rt := newRoutingTableForTest(c, srv)
err = rt.Join()
if err != nil {
return nil, err
}
err = rt.Start()
if err != nil {
return nil, err
Expand Down
4 changes: 4 additions & 0 deletions internal/testcluster/testcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ func (t *TestCluster) AddMember(e *environment.Environment) service.Service {

s := t.newService(e)
rt := e.Get("routingtable").(*routingtable.RoutingTable)
err = rt.Join()
if err != nil {
panic(fmt.Sprintf("failed to join the Olric cluster: %v", err))
}
err = rt.Start()
if err != nil {
panic(fmt.Sprintf("failed to start the routing table: %v", err))
Expand Down
7 changes: 7 additions & 0 deletions olric.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,13 @@ func (db *Olric) Start() error {
return err
}

// First, we need to join the cluster. Then, the routing table has been started.
if err := db.rt.Join(); err != nil {
if err != nil {
db.log.V(2).Printf("[ERROR] Failed to join the Olric cluster: %v", err)
}
return err
}
// Start routing table service and member discovery subsystem.
if err := db.rt.Start(); err != nil {
if err != nil {
Expand Down

0 comments on commit 82e4433

Please sign in to comment.