diff --git a/internal/cluster/balancer/balancer_test.go b/internal/cluster/balancer/balancer_test.go index 4bd09805..65d8435c 100644 --- a/internal/cluster/balancer/balancer_test.go +++ b/internal/cluster/balancer/balancer_test.go @@ -112,6 +112,9 @@ func (mc *mockCluster) addNode(e *environment.Environment) *Balancer { require.NoError(mc.t, err) } + err = b.rt.Join() + require.NoError(mc.t, err) + err = b.rt.Start() if err != nil { require.NoError(mc.t, err) diff --git a/internal/cluster/routingtable/discovery.go b/internal/cluster/routingtable/discovery.go index d3343392..d3630845 100644 --- a/internal/cluster/routingtable/discovery.go +++ b/internal/cluster/routingtable/discovery.go @@ -21,8 +21,9 @@ import ( ) var ( - ErrServerGone = errors.New("server is gone") - ErrClusterJoin = errors.New("cannot join the cluster") + ErrServerGone = errors.New("server is gone") + ErrNotJoinedYet = errors.New("not joined yet") + ErrClusterJoin = errors.New("cannot join the cluster") // ErrOperationTimeout is returned when an operation times out. ErrOperationTimeout = errors.New("operation timeout") ) diff --git a/internal/cluster/routingtable/operations.go b/internal/cluster/routingtable/operations.go index b6610532..d475f7c4 100644 --- a/internal/cluster/routingtable/operations.go +++ b/internal/cluster/routingtable/operations.go @@ -25,6 +25,9 @@ import ( ) func (r *RoutingTable) lengthOfPartCommandHandler(conn redcon.Conn, cmd redcon.Command) { + // The command handlers of the routing table service should wait for the cluster join event. + <-r.joined + lengthOfPartCmd, err := protocol.ParseLengthOfPartCommand(cmd) if err != nil { protocol.WriteError(conn, err) @@ -61,6 +64,9 @@ func (r *RoutingTable) verifyRoutingTable(id uint64, table map[uint64]*route) er } func (r *RoutingTable) updateRoutingCommandHandler(conn redcon.Conn, cmd redcon.Command) { + // The command handlers of the routing table service should wait for the cluster join event. + <-r.joined + r.updateRoutingMtx.Lock() defer r.updateRoutingMtx.Unlock() diff --git a/internal/cluster/routingtable/routingtable.go b/internal/cluster/routingtable/routingtable.go index 5ddbda3a..c59880ce 100644 --- a/internal/cluster/routingtable/routingtable.go +++ b/internal/cluster/routingtable/routingtable.go @@ -71,9 +71,11 @@ type RoutingTable struct { callbacks []func() callbackMtx sync.Mutex pushPeriod time.Duration - ctx context.Context - cancel context.CancelFunc - wg sync.WaitGroup + // The command handlers of the routing table service should wait for the cluster join event. + joined chan struct{} + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup } func registerErrors() { @@ -108,6 +110,7 @@ func New(e *environment.Environment) *RoutingTable { client: e.Get("client").(*server.Client), server: e.Get("server").(*server.Server), pushPeriod: c.RoutingTablePushInterval, + joined: make(chan struct{}), ctx: ctx, cancel: cancel, } @@ -343,7 +346,7 @@ func (r *RoutingTable) pushPeriodically() { } } -func (r *RoutingTable) Start() error { +func (r *RoutingTable) Join() error { err := r.discovery.Start() if err != nil { return err @@ -361,13 +364,25 @@ func (r *RoutingTable) Start() error { this, err := r.discovery.FindMemberByName(r.config.MemberlistConfig.Name) if err != nil { r.log.V(2).Printf("[ERROR] Failed to get this node in cluster: %v", err) - serr := r.discovery.Shutdown() - if serr != nil { - return serr + shutdownError := r.discovery.Shutdown() + if shutdownError != nil { + return shutdownError } return err } r.this = this + close(r.joined) + return nil +} + +func (r *RoutingTable) Start() error { + select { + case <-r.joined: + // It's time to start the routing table service. Otherwise, this method will return an error. + default: + // Not yet, or the join process has failed + return ErrNotJoinedYet + } // Store the current number of members in the member list. // We need this to implement a simple split-brain protection algorithm. @@ -379,7 +394,7 @@ func (r *RoutingTable) Start() error { // 1 Hour ctx, cancel := context.WithTimeout(r.ctx, time.Hour) defer cancel() - err = r.tryWithInterval(ctx, time.Second, func() error { + err := r.tryWithInterval(ctx, time.Second, func() error { // Check member count quorum now. If there is no enough peers to work, wait forever. err := r.CheckMemberCountQuorum() if err != nil { diff --git a/internal/cluster/routingtable/routingtable_test.go b/internal/cluster/routingtable/routingtable_test.go index d475b73a..5b5d0ccc 100644 --- a/internal/cluster/routingtable/routingtable_test.go +++ b/internal/cluster/routingtable/routingtable_test.go @@ -87,6 +87,10 @@ func (t *testCluster) addNode(c *config.Config) (*RoutingTable, error) { srv := testutil.NewServer(c) rt := newRoutingTableForTest(c, srv) + err = rt.Join() + if err != nil { + return nil, err + } err = rt.Start() if err != nil { return nil, err diff --git a/internal/testcluster/testcluster.go b/internal/testcluster/testcluster.go index 0e9cec24..6c1c1ff8 100644 --- a/internal/testcluster/testcluster.go +++ b/internal/testcluster/testcluster.go @@ -142,6 +142,10 @@ func (t *TestCluster) AddMember(e *environment.Environment) service.Service { s := t.newService(e) rt := e.Get("routingtable").(*routingtable.RoutingTable) + err = rt.Join() + if err != nil { + panic(fmt.Sprintf("failed to join the Olric cluster: %v", err)) + } err = rt.Start() if err != nil { panic(fmt.Sprintf("failed to start the routing table: %v", err)) diff --git a/olric.go b/olric.go index 7b638376..44e4a19f 100644 --- a/olric.go +++ b/olric.go @@ -341,6 +341,13 @@ func (db *Olric) Start() error { return err } + // First, we need to join the cluster. Then, the routing table has been started. + if err := db.rt.Join(); err != nil { + if err != nil { + db.log.V(2).Printf("[ERROR] Failed to join the Olric cluster: %v", err) + } + return err + } // Start routing table service and member discovery subsystem. if err := db.rt.Start(); err != nil { if err != nil {