Skip to content

Commit

Permalink
fix(tests): added tests for Freeze/Reset
Browse files Browse the repository at this point in the history
  • Loading branch information
cnlangzi committed Mar 19, 2024
1 parent 50b8e37 commit 47acd96
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 83 deletions.
89 changes: 44 additions & 45 deletions mutext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,41 +10,41 @@ import (
"github.com/yaitoo/sqle"
)

func createCluster(ctx context.Context, num int) ([]string, []*Node, []func(), error) {
func createCluster(num int) ([]string, []*Node, func(), error) {
var peers []string
var nodes []*Node
var clean []func()

release := func() {
for _, c := range clean {
c()
}
}

for i := 0; i < num; i++ {
db, fn, err := createSqlite3()
if err != nil {
return nil, nil, clean, err
return nil, nil, release, err
}
clean = append(clean, fn)
n := NewNode(getFreeAddr(), sqle.Open(db))
err = n.Start(ctx)
err = n.Start()
if err != nil {
return nil, nil, clean, err
return nil, nil, release, err
}
peers = append(peers, n.addr)
nodes = append(nodes, n)

clean = append(clean, n.Stop)
}

return peers, nodes, clean, nil
return peers, nodes, release, nil
}

func TestLock(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
peers, nodes, clean, err := createCluster(ctx, 5)

peers, nodes, clean, err := createCluster(5)
require.NoError(t, err)

defer func() {
for _, c := range clean {
c()
}
}()
defer clean()

tests := []struct {
name string
Expand Down Expand Up @@ -130,7 +130,9 @@ func TestLock(t *testing.T) {
m := New("lock_should_work", "wallet", "minority_nodes_are_down", WithPeers(peers...), WithTTL(10*time.Second))

nodes[0].Stop()
defer nodes[0].Start() // nolint: errcheck
nodes[1].Stop()
defer nodes[1].Start() // nolint: errcheck

err := m.Lock(context.TODO())

Expand All @@ -151,8 +153,11 @@ func TestLock(t *testing.T) {
m := New("lock_should_work", "wallet", "majority_nodes_are_down", WithPeers(peers...), WithTTL(10*time.Second))

nodes[0].Stop()
defer nodes[0].Start() // nolint: errcheck
nodes[1].Stop()
defer nodes[1].Start() // nolint: errcheck
nodes[2].Stop()
defer nodes[2].Start() // nolint: errcheck

err = m.Lock(context.TODO())
r.Error(err, async.ErrTooLessDone)
Expand All @@ -168,17 +173,9 @@ func TestLock(t *testing.T) {
}

func TestRenew(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
peers, nodes, clean, err := createCluster(ctx, 5)

peers, nodes, clean, err := createCluster(5)
require.NoError(t, err)

defer func() {
for _, c := range clean {
c()
}
}()
defer clean()

tests := []struct {
name string
Expand Down Expand Up @@ -290,7 +287,9 @@ func TestRenew(t *testing.T) {
r.Equal("renew_minority", m.lease.Key)

nodes[0].Stop()
defer nodes[0].Start() // nolint: errcheck
nodes[1].Stop()
defer nodes[1].Start() // nolint: errcheck
err = m.Renew(context.TODO())
r.NoError(err)

Expand All @@ -309,8 +308,12 @@ func TestRenew(t *testing.T) {
r.Equal("renew_majority", m.lease.Key)

nodes[0].Stop()
defer nodes[0].Start() // nolint: errcheck
nodes[1].Stop()
defer nodes[1].Start() // nolint: errcheck
nodes[2].Stop()
defer nodes[2].Start() // nolint: errcheck

err = m.Renew(context.TODO())
r.ErrorIs(err, async.ErrTooLessDone)

Expand All @@ -326,17 +329,9 @@ func TestRenew(t *testing.T) {
}

func TestUnlock(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
peers, nodes, clean, err := createCluster(ctx, 5)

peers, nodes, clean, err := createCluster(5)
require.NoError(t, err)

defer func() {
for _, c := range clean {
c()
}
}()
defer clean()

tests := []struct {
name string
Expand Down Expand Up @@ -407,7 +402,11 @@ func TestUnlock(t *testing.T) {
r.Equal("unlock_minority", m.lease.Key)

nodes[0].Stop()
defer nodes[0].Start() // nolint: errcheck

nodes[1].Stop()
defer nodes[1].Start() // nolint: errcheck

err = m.Unlock(context.TODO())
r.NoError(err)

Expand All @@ -428,8 +427,11 @@ func TestUnlock(t *testing.T) {
r.Equal("unlock_majority", m.lease.Key)

nodes[0].Stop()
defer nodes[0].Start() // nolint: errcheck
nodes[1].Stop()
defer nodes[1].Start() // nolint: errcheck
nodes[2].Stop()
defer nodes[2].Start() // nolint: errcheck

err = m.Unlock(context.TODO())
r.ErrorIs(err, async.ErrTooLessDone)
Expand All @@ -446,17 +448,9 @@ func TestUnlock(t *testing.T) {
}

func TestTopic(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
peers, nodes, clean, err := createCluster(ctx, 5)

peers, nodes, clean, err := createCluster(5)
require.NoError(t, err)

defer func() {
for _, c := range clean {
c()
}
}()
defer clean()

tests := []struct {
name string
Expand Down Expand Up @@ -516,7 +510,9 @@ func TestTopic(t *testing.T) {
r.Equal("freeze", m.lease.Key)

nodes[0].Stop()
defer nodes[0].Start() // nolint: errcheck
nodes[1].Stop()
defer nodes[1].Start() // nolint: errcheck

err = m.Freeze(context.Background(), "freeze")
r.NoError(err)
Expand Down Expand Up @@ -560,8 +556,11 @@ func TestTopic(t *testing.T) {
r.Equal("freeze", m.lease.Key)

nodes[0].Stop()
defer nodes[0].Start() // nolint: errcheck
nodes[1].Stop()
defer nodes[1].Start() // nolint: errcheck
nodes[2].Stop()
defer nodes[2].Start() // nolint: errcheck

err = m.Freeze(context.Background(), "freeze")
r.ErrorIs(err, async.ErrTooLessDone)
Expand Down
12 changes: 7 additions & 5 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dlm

import (
"log/slog"
"net"
"net/rpc"
"sync"

Expand All @@ -14,7 +15,6 @@ func NewNode(addr string, db *sqle.DB, options ...NodeOption) *Node {
db: db,
frozen: make(map[string]struct{}),
logger: slog.Default(),
close: make(chan struct{}),
}

for _, o := range options {
Expand All @@ -25,12 +25,14 @@ func NewNode(addr string, db *sqle.DB, options ...NodeOption) *Node {
}

type Node struct {
mu sync.Mutex
mu sync.RWMutex
db *sqle.DB
logger *slog.Logger
frozen map[string]struct{}

addr string
server *rpc.Server
close chan struct{}
stopped bool

addr string
listener net.Listener
server *rpc.Server
}
55 changes: 32 additions & 23 deletions node_svc.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,59 +9,68 @@ import (
)

// Start start the node and its RPC service
func (n *Node) Start(ctx context.Context) error {
func (n *Node) Start() error {
n.mu.Lock()
defer n.mu.Unlock()

l, err := net.Listen("tcp", n.addr)
if err != nil {
return err
}

_, err = n.db.ExecContext(ctx, CreateTableLease)
_, err = n.db.ExecContext(context.Background(), CreateTableLease)
if err != nil {
return err
}

_, err = n.db.ExecContext(ctx, CreateTableTopic)
_, err = n.db.ExecContext(context.Background(), CreateTableTopic)
if err != nil {
return err
}

n.server = rpc.NewServer()
go n.waitClose(ctx, l)
go n.waitRequest(l)

n.listener = l
go n.waitRequest()
n.stopped = false
n.logger.Info("dlm: node is running")
return n.server.RegisterName("dlm", n)
}

// Stop stop the node and its RPC service
func (n *Node) Stop() {
go func() {
n.close <- struct{}{}
}()
n.logger.Info("dlm: node stopped")
n.mu.Lock()
defer n.mu.Unlock()
n.listener.Close()
n.stopped = true
n.logger.Info("dlm: node is stopped")
}

func (n *Node) isStopped() bool {
n.mu.RLock()
defer n.mu.RUnlock()

return n.stopped
}

func (n *Node) waitRequest(l net.Listener) {
func (n *Node) waitRequest() {

for {
conn, err := l.Accept()
conn, err := n.listener.Accept()

if err == nil {

if n.isStopped() {
return

Check warning on line 63 in node_svc.go

View check run for this annotation

Codecov / codecov/patch

node_svc.go#L63

Added line #L63 was not covered by tests
}

go n.server.ServeConn(conn)

} else {
if errors.Is(err, net.ErrClosed) {
return
}

n.logger.Warn("dlm: wait request", slog.String("err", err.Error()), slog.String("addr", n.addr))
}

}
}

func (n *Node) waitClose(ctx context.Context, l net.Listener) {

select {
case <-n.close:
case <-ctx.Done():
}

l.Close()
}
14 changes: 4 additions & 10 deletions node_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package dlm

import (
"context"
"database/sql"
"net"
"os"
Expand Down Expand Up @@ -48,16 +47,13 @@ func createSqlite3() (*sql.DB, func(), error) {
}

func TestLease(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

db, clean, err := createSqlite3()
require.NoError(t, err)
defer clean()

n := NewNode(getFreeAddr(), sqle.Open(db))
err = n.Start(ctx)
err = n.Start()
require.NoError(t, err)
defer n.Stop()

walletTerms := sqle.Duration(5 * time.Second)
userTerms := sqle.Duration(3 * time.Second)
Expand Down Expand Up @@ -233,16 +229,14 @@ func TestLease(t *testing.T) {
}

func TestNodeTopic(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

db, clean, err := createSqlite3()
require.NoError(t, err)
defer clean()

n := NewNode(getFreeAddr(), sqle.Open(db))
err = n.Start(ctx)
err = n.Start()
require.NoError(t, err)
defer n.Stop()

terms := sqle.Duration(5 * time.Second)

Expand Down

0 comments on commit 47acd96

Please sign in to comment.