From fc74447c9d2d0788e801c251650d318ad6b2e203 Mon Sep 17 00:00:00 2001 From: Burak Sezer Date: Fri, 10 May 2024 21:46:29 +0300 Subject: [PATCH] Add join process before starting routing table #247 The code has been updated to include a join process before starting the routing table in internal/cluster/routingtable/ and internal/testcluster/testcluster.go. This ensures that the command handlers of the routing table service wait for the cluster join event. Additionally, any join process failure in RoutingTable is now handled properly. The changes also include the introduction of 'ErrNotJoinedYet' error in the discovery.go file to handle 'not joined yet' situations. --- internal/cluster/balancer/balancer_test.go | 3 ++ internal/cluster/routingtable/discovery.go | 5 +-- internal/cluster/routingtable/operations.go | 6 ++++ internal/cluster/routingtable/routingtable.go | 31 ++++++++++++++----- .../cluster/routingtable/routingtable_test.go | 4 +++ internal/testcluster/testcluster.go | 4 +++ olric.go | 7 +++++ 7 files changed, 50 insertions(+), 10 deletions(-) diff --git a/internal/cluster/balancer/balancer_test.go b/internal/cluster/balancer/balancer_test.go index 7780766f..cb94b48e 100644 --- a/internal/cluster/balancer/balancer_test.go +++ b/internal/cluster/balancer/balancer_test.go @@ -112,6 +112,9 @@ func (mc *mockCluster) addNode(e *environment.Environment) *Balancer { require.NoError(mc.t, err) } + err = b.rt.Join() + require.NoError(mc.t, err) + err = b.rt.Start() if err != nil { require.NoError(mc.t, err) diff --git a/internal/cluster/routingtable/discovery.go b/internal/cluster/routingtable/discovery.go index 8d29384b..ced0134d 100644 --- a/internal/cluster/routingtable/discovery.go +++ b/internal/cluster/routingtable/discovery.go @@ -21,8 +21,9 @@ import ( ) var ( - ErrServerGone = errors.New("server is gone") - ErrClusterJoin = errors.New("cannot join the cluster") + ErrServerGone = errors.New("server is gone") + ErrNotJoinedYet = errors.New("not joined yet") + ErrClusterJoin = errors.New("cannot join the cluster") // ErrOperationTimeout is returned when an operation times out. ErrOperationTimeout = errors.New("operation timeout") ) diff --git a/internal/cluster/routingtable/operations.go b/internal/cluster/routingtable/operations.go index 0e3e7d48..37b841c8 100644 --- a/internal/cluster/routingtable/operations.go +++ b/internal/cluster/routingtable/operations.go @@ -25,6 +25,9 @@ import ( ) func (r *RoutingTable) lengthOfPartCommandHandler(conn redcon.Conn, cmd redcon.Command) { + // The command handlers of the routing table service should wait for the cluster join event. + <-r.joined + lengthOfPartCmd, err := protocol.ParseLengthOfPartCommand(cmd) if err != nil { protocol.WriteError(conn, err) @@ -61,6 +64,9 @@ func (r *RoutingTable) verifyRoutingTable(id uint64, table map[uint64]*route) er } func (r *RoutingTable) updateRoutingCommandHandler(conn redcon.Conn, cmd redcon.Command) { + // The command handlers of the routing table service should wait for the cluster join event. + <-r.joined + r.updateRoutingMtx.Lock() defer r.updateRoutingMtx.Unlock() diff --git a/internal/cluster/routingtable/routingtable.go b/internal/cluster/routingtable/routingtable.go index 4902fcdc..a1abba26 100644 --- a/internal/cluster/routingtable/routingtable.go +++ b/internal/cluster/routingtable/routingtable.go @@ -71,9 +71,11 @@ type RoutingTable struct { callbacks []func() callbackMtx sync.Mutex pushPeriod time.Duration - ctx context.Context - cancel context.CancelFunc - wg sync.WaitGroup + // The command handlers of the routing table service should wait for the cluster join event. + joined chan struct{} + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup } func registerErrors() { @@ -108,6 +110,7 @@ func New(e *environment.Environment) *RoutingTable { client: e.Get("client").(*server.Client), server: e.Get("server").(*server.Server), pushPeriod: c.RoutingTablePushInterval, + joined: make(chan struct{}), ctx: ctx, cancel: cancel, } @@ -343,7 +346,7 @@ func (r *RoutingTable) pushPeriodically() { } } -func (r *RoutingTable) Start() error { +func (r *RoutingTable) Join() error { err := r.discovery.Start() if err != nil { return err @@ -361,13 +364,25 @@ func (r *RoutingTable) Start() error { this, err := r.discovery.FindMemberByName(r.config.MemberlistConfig.Name) if err != nil { r.log.V(2).Printf("[ERROR] Failed to get this node in cluster: %v", err) - serr := r.discovery.Shutdown() - if serr != nil { - return serr + shutdownError := r.discovery.Shutdown() + if shutdownError != nil { + return shutdownError } return err } r.this = this + close(r.joined) + return nil +} + +func (r *RoutingTable) Start() error { + select { + case <-r.joined: + // It's time to start the routing table service. Otherwise, this method will return an error. + default: + // Not yet, or the join process has failed + return ErrNotJoinedYet + } // Store the current number of members in the member list. // We need this to implement a simple split-brain protection algorithm. @@ -379,7 +394,7 @@ func (r *RoutingTable) Start() error { // 1 Hour ctx, cancel := context.WithTimeout(r.ctx, time.Hour) defer cancel() - err = r.tryWithInterval(ctx, time.Second, func() error { + err := r.tryWithInterval(ctx, time.Second, func() error { // Check member count quorum now. If there is no enough peers to work, wait forever. err := r.CheckMemberCountQuorum() if err != nil { diff --git a/internal/cluster/routingtable/routingtable_test.go b/internal/cluster/routingtable/routingtable_test.go index 84f232e0..fea26e3e 100644 --- a/internal/cluster/routingtable/routingtable_test.go +++ b/internal/cluster/routingtable/routingtable_test.go @@ -87,6 +87,10 @@ func (t *testCluster) addNode(c *config.Config) (*RoutingTable, error) { srv := testutil.NewServer(c) rt := newRoutingTableForTest(c, srv) + err = rt.Join() + if err != nil { + return nil, err + } err = rt.Start() if err != nil { return nil, err diff --git a/internal/testcluster/testcluster.go b/internal/testcluster/testcluster.go index 7d0fb244..0c00bb07 100644 --- a/internal/testcluster/testcluster.go +++ b/internal/testcluster/testcluster.go @@ -142,6 +142,10 @@ func (t *TestCluster) AddMember(e *environment.Environment) service.Service { s := t.newService(e) rt := e.Get("routingtable").(*routingtable.RoutingTable) + err = rt.Join() + if err != nil { + panic(fmt.Sprintf("failed to join the Olric cluster: %v", err)) + } err = rt.Start() if err != nil { panic(fmt.Sprintf("failed to start the routing table: %v", err)) diff --git a/olric.go b/olric.go index f79bc789..8cfa4a79 100644 --- a/olric.go +++ b/olric.go @@ -341,6 +341,13 @@ func (db *Olric) Start() error { return err } + // First, we need to join the cluster. Then, the routing table has been started. + if err := db.rt.Join(); err != nil { + if err != nil { + db.log.V(2).Printf("[ERROR] Failed to join the Olric cluster: %v", err) + } + return err + } // Start routing table service and member discovery subsystem. if err := db.rt.Start(); err != nil { if err != nil {