From 75bf8a9ab0e5f78649eb2e0620aad497d3f297e8 Mon Sep 17 00:00:00 2001 From: Lz Date: Tue, 19 Mar 2024 09:24:15 +0800 Subject: [PATCH] fix(node): used sql instead raft as store (#1) --- README.md | 9 +- cluster.go | 100 --------------- cluster_option.go | 17 --- cluster_test.go | 174 -------------------------- cmd.go | 12 -- dlm.go | 40 +++--- go.mod | 21 ++-- go.sum | 145 ++++------------------ lease.go | 11 +- mutex.go | 85 +++++++------ mutex_option.go | 10 +- mutext_test.go | 97 ++++++++++++++- node.go | 122 ++----------------- node_db.go | 83 +++++++++++++ node_fsm.go | 67 ---------- node_option.go | 19 +-- node_rpc.go | 125 ++++--------------- node_svc.go | 88 +++----------- node_test.go | 304 ++++++++++++++++++++++++---------------------- request.go | 4 - snapshot.go | 40 ------ 21 files changed, 524 insertions(+), 1049 deletions(-) delete mode 100644 cluster.go delete mode 100644 cluster_option.go delete mode 100644 cluster_test.go delete mode 100644 cmd.go create mode 100644 node_db.go delete mode 100644 node_fsm.go delete mode 100644 snapshot.go diff --git a/README.md b/README.md index cdad84c..ab49b52 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,12 @@ # dlm -Distributed Lock Manager +a fault-tolerant distributed lock manager for Go + +![License](https://img.shields.io/badge/license-MIT-green.svg) +[![Tests](https://github.com/yaitoo/dlm/actions/workflows/tests.yml/badge.svg)](https://github.com/yaitoo/dlm/actions/workflows/tests.yml) +[![Go Reference](https://pkg.go.dev/badge/github.com/yaitoo/dlm.svg)](https://pkg.go.dev/github.com/yaitoo/dlm) +[![Codecov](https://codecov.io/gh/yaitoo/dlm/branch/main/graph/badge.svg)](https://codecov.io/gh/yaitoo/dlm) +[![GitHub Release](https://img.shields.io/github/v/release/yaitoo/dlm)](https://github.com/yaitoo/dlm/blob/main/CHANGELOG.md) +[![Go Report Card](https://goreportcard.com/badge/yaitoo/dlm)](http://goreportcard.com/report/yaitoo/dlm) inspired by diff --git a/cluster.go b/cluster.go deleted file mode 100644 index 30f2baa..0000000 --- a/cluster.go +++ /dev/null @@ -1,100 +0,0 @@ -package dlm - -import ( - "context" - "log/slog" - "net/rpc" - "slices" - "time" - - "github.com/hashicorp/raft" - "github.com/yaitoo/async" -) - -type Cluster struct { - terms map[string]time.Duration - peers []Peer -} - -func (c *Cluster) Start(n *Node, options ...ClusterOption) error { - - for _, o := range options { - o(c) - } - - if len(n.raft.GetConfiguration().Configuration().Servers) == 0 { - - configuration := raft.Configuration{} - - configuration.Servers = []raft.Server{ - { - ID: raft.ServerID(n.raftID), - Address: raft.ServerAddress(n.raftAddr), - }, - } - - f := n.raft.BootstrapCluster(configuration) - - err := f.Error() - if err != nil { - return err - } - } - - if !slices.ContainsFunc(c.peers, func(p Peer) bool { - return p.RaftID == n.raftID - }) { - c.peers = append(c.peers, Peer{RaftID: n.raftID, RaftAddr: n.raftAddr, Addr: n.addr}) - } - - ok := <-n.raft.LeaderCh() - if ok { - var err error - for k, v := range c.terms { - req := TermsRequest{Topic: k, TTL: v} - err = n.SetTerms(req, &ok) - if err != nil { - return err - } - } - - return nil - } - - return ErrClusterNotStarted -} - -func (c *Cluster) Join(ctx context.Context, p Peer) error { - a := async.NewA() - - var exists bool - for _, peer := range c.peers { - - if peer.RaftID == p.RaftID { - exists = true - } - - c, err := connect(ctx, peer.Addr, DefaultRaftTimeout) - if err != nil { - Logger.Warn("dlm: peers are unreachable", slog.String("peer", peer.Addr)) - continue - } - - a.Add(func(c *rpc.Client) func(ctx context.Context) error { - return func(ctx context.Context) error { - var s bool - err := c.Call("dlm.Join", p, &s) - return err - } - }(c)) - } - - _, err := a.WaitAny(ctx) - - if err == nil && !exists { - c.peers = append(c.peers, p) - } - - return err - -} diff --git a/cluster_option.go b/cluster_option.go deleted file mode 100644 index 81cb4b5..0000000 --- a/cluster_option.go +++ /dev/null @@ -1,17 +0,0 @@ -package dlm - -import "time" - -type ClusterOption func(c *Cluster) - -func WithLeaseTerms(terms map[string]time.Duration) ClusterOption { - return func(c *Cluster) { - c.terms = terms - } -} - -func WithPeers(peers ...Peer) ClusterOption { - return func(c *Cluster) { - c.peers = peers - } -} diff --git a/cluster_test.go b/cluster_test.go deleted file mode 100644 index ee81b25..0000000 --- a/cluster_test.go +++ /dev/null @@ -1,174 +0,0 @@ -package dlm - -import ( - "context" - "net" - "os" - "strconv" - "testing" - "time" - - "github.com/hashicorp/raft" - "github.com/stretchr/testify/require" -) - -func getFreeAddr() string { - l, err := net.Listen("tcp", ":0") - - if err != nil { - return "" - } - - defer l.Close() - return "127.0.0.1:" + strconv.Itoa(l.Addr().(*net.TCPAddr).Port) -} - -func TestCluster(t *testing.T) { - tests := []struct { - name string - setup func(ctx context.Context) ([]*Node, error) - assert func(re *require.Assertions, nodes []*Node) - }{ - { - name: "single_node_should_work", - setup: func(ctx context.Context) ([]*Node, error) { - n := NewNode(getFreeAddr(), WithRaft("1", getFreeAddr()), WithDataDir("./.db"), WithLogger(os.Stderr)) - err := n.Start(ctx) - if err != nil { - return nil, err - } - - c := &Cluster{} - err = c.Start(n) - if err != nil { - return nil, err - } - - return []*Node{n}, nil - }, - assert: func(re *require.Assertions, nodes []*Node) { - re.True(nodes[0].raft.State() == raft.Leader) - }, - }, - { - name: "multi_nodes_should_work", - setup: func(ctx context.Context) ([]*Node, error) { - - var peers []Peer - for i := 0; i < 9; i++ { - peers = append(peers, Peer{ - RaftID: "n" + strconv.Itoa(i), - RaftAddr: getFreeAddr(), - Addr: getFreeAddr(), - }) - } - - var err error - - var nodes []*Node - for i := 0; i < 9; i++ { - n := NewNode(peers[i].Addr, WithRaft(peers[i].RaftID, peers[i].RaftAddr), WithLogger(os.Stderr)) - err = n.Start(ctx) - - if err != nil { - return nil, err - } - - err = n.Serve(ctx) - if err != nil { - return nil, err - } - - nodes = append(nodes, n) - } - - c := &Cluster{} - err = c.Start(nodes[0], WithPeers(peers[0:3]...)) - if err != nil { - return nil, err - } - - for i := 1; i < 9; i++ { - err = c.Join(ctx, peers[i]) - if err != nil { - return nil, err - } - - } - - return nodes, nil - }, - assert: func(re *require.Assertions, nodes []*Node) { - - var id string - - for _, n := range nodes { - if n.raft.State() == raft.Leader { - - if id == "" { - id = n.raftID - } else { - re.Fail("leader is not unique") - } - } - - } - - re.NotEmpty(id, "leader is empty") - }, - }, - { - name: "with_terms_should_work", - setup: func(ctx context.Context) ([]*Node, error) { - n := NewNode(getFreeAddr(), WithRaft("1", getFreeAddr()), WithLogger(os.Stderr)) - err := n.Start(ctx) - if err != nil { - return nil, err - } - - c := &Cluster{} - err = c.Start(n, WithLeaseTerms(map[string]time.Duration{ - "wallet": 5 * time.Second, - "user": 3 * time.Second, - })) - if err != nil { - return nil, err - } - - return []*Node{n}, nil - - }, - assert: func(re *require.Assertions, nodes []*Node) { - - leader := nodes[0] - - re.True(leader.raft.State() == raft.Leader) - ttl, err := leader.getTerms("wallet") - re.NoError(err) - re.Equal(5*time.Second, ttl) - - ttl, err = leader.getTerms("user") - re.NoError(err) - re.Equal(3*time.Second, ttl) - - }, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - - re := require.New(t) - nodes, err := test.setup(ctx) - re.NoError(err) - - test.assert(re, nodes) - - cancel() - os.RemoveAll("./.db") - - }) - } - -} diff --git a/cmd.go b/cmd.go deleted file mode 100644 index bd93770..0000000 --- a/cmd.go +++ /dev/null @@ -1,12 +0,0 @@ -package dlm - -const ( - CmdSet = "SET" - CmdRemove = "REM" -) - -type cmd struct { - Name string - Key string - Value []byte -} diff --git a/dlm.go b/dlm.go index 5b4dbab..24aca7f 100644 --- a/dlm.go +++ b/dlm.go @@ -7,26 +7,34 @@ import ( ) var ( - ErrExpiredLease = errors.New("dlm: lease expires") - ErrNoLease = errors.New("dlm: no lease") - ErrNotYourLease = errors.New("dlm: not your lease") - ErrStaleNonce = errors.New("dlm: stale nonce") - ErrInvalidTopic = errors.New("dlm: topic can't starts with @") - ErrNotRaftLeader = errors.New("dlm: not raft leader") - ErrAlreadyLocked = errors.New("dlm: already locked by others") - ErrNoConsensus = errors.New("dlm: no consensus") - ErrClusterNotStarted = errors.New("dlm: cluster is not started") - ErrFrozenTopic = errors.New("dlm: topic is frozen") + ErrExpiredLease = errors.New("dlm: lease expires") + ErrNoLease = errors.New("dlm: no lease") + ErrNotYourLease = errors.New("dlm: not your lease") + ErrLeaseExists = errors.New("dlm: lease exists") + + ErrFrozenTopic = errors.New("dlm: topic is frozen") + + ErrBadDatabase = errors.New("dlm: bad database operation") ) var ( - DefaultRaftTimeout = 3 * time.Second - DefaultTimeout = 3 * time.Second - DefaultLeaseTerm = 5 * time.Second - DefaultRetainSnapshotCount = 2 - Logger = slog.Default() + DefaultTimeout = 3 * time.Second + DefaultLeaseTerm = 5 * time.Second + + Logger = slog.Default() ) const ( - TopicTerms = "@terms:" + CreateTableLease = "CREATE TABLE IF NOT EXISTS dlm_lease(" + + "`topic` varchar(20) NOT NULL," + + "`key` varchar(50) NOT NULL," + + "`lessee` varchar(36) NOT NULL," + + "`since` int NOT NULL DEFAULT '0'," + + "`ttl` int NOT NULL DEFAULT '0'," + + "PRIMARY KEY (topic, key));" + + CreateTableTopic = "CREATE TABLE IF NOT EXISTS dlm_topic(" + + "`topic` varchar(20) NOT NULL," + + "`ttl` int NOT NULL DEFAULT '0'," + + "PRIMARY KEY (topic));" ) diff --git a/go.mod b/go.mod index 69ddbe2..5e6c91d 100644 --- a/go.mod +++ b/go.mod @@ -3,25 +3,20 @@ module github.com/yaitoo/dlm go 1.18 require ( - github.com/hashicorp/raft v1.6.1 - github.com/hashicorp/raft-boltdb/v2 v2.3.0 + github.com/mattn/go-sqlite3 v1.14.22 github.com/stretchr/testify v1.9.0 github.com/yaitoo/async v1.0.4 + github.com/yaitoo/sqle v1.3.1 ) require ( - github.com/armon/go-metrics v0.4.1 // indirect - github.com/boltdb/bolt v1.3.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect - github.com/fatih/color v1.13.0 // indirect - github.com/hashicorp/go-hclog v1.6.2 // indirect - github.com/hashicorp/go-immutable-radix v1.0.0 // indirect - github.com/hashicorp/go-msgpack/v2 v2.1.1 // indirect - github.com/hashicorp/golang-lru v0.5.0 // indirect - github.com/mattn/go-colorable v0.1.12 // indirect - github.com/mattn/go-isatty v0.0.14 // indirect + github.com/iancoleman/strcase v0.3.0 // indirect + github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - go.etcd.io/bbolt v1.3.5 // indirect - golang.org/x/sys v0.13.0 // indirect + github.com/rs/zerolog v1.32.0 // indirect + golang.org/x/sys v0.18.0 // indirect + gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 6362781..a2f425f 100644 --- a/go.sum +++ b/go.sum @@ -1,136 +1,41 @@ -github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= -github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= -github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= -github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= -github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= -github.com/armon/go-metrics v0.4.1 h1:hR91U9KYmb6bLBYLQjyM+3j+rcd/UhE+G78SFnF8gJA= -github.com/armon/go-metrics v0.4.1/go.mod h1:E6amYzXo6aW1tqzoZGT755KkbgrJsSdpwZ+3JqfkOG4= -github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= -github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= -github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= -github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4= -github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps= -github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag= -github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I= -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= -github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= -github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= -github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= -github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= -github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= -github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= -github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= -github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= -github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= -github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= -github.com/hashicorp/go-hclog v1.6.2 h1:NOtoftovWkDheyUM/8JW3QMiXyxJK3uHRK7wV04nD2I= -github.com/hashicorp/go-hclog v1.6.2/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M= -github.com/hashicorp/go-immutable-radix v1.0.0 h1:AKDB1HM5PWEA7i4nhcpwOrO2byshxBjXVn/J/3+z5/0= -github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= -github.com/hashicorp/go-msgpack v0.5.5 h1:i9R9JSrqIz0QVLz3sz+i3YJdT7TTSLcfLLzJi9aZTuI= -github.com/hashicorp/go-msgpack/v2 v2.1.1 h1:xQEY9yB2wnHitoSzk/B9UjXWRQ67QKu5AOm8aFp8N3I= -github.com/hashicorp/go-msgpack/v2 v2.1.1/go.mod h1:upybraOAblm4S7rx0+jeNy+CWWhzywQsSRV5033mMu4= -github.com/hashicorp/go-retryablehttp v0.5.3/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs= -github.com/hashicorp/go-uuid v1.0.0 h1:RS8zrF7PhGwyNPOtxSClXXj9HA8feRnJzgnI1RJCSnM= -github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= -github.com/hashicorp/golang-lru v0.5.0 h1:CL2msUPvZTLb5O648aiLNJw3hnBxN2+1Jq8rCOH9wdo= -github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= -github.com/hashicorp/raft v1.6.1 h1:v/jm5fcYHvVkL0akByAp+IDdDSzCNCGhdO6VdB56HIM= -github.com/hashicorp/raft v1.6.1/go.mod h1:N1sKh6Vn47mrWvEArQgILTyng8GoDRNYlgKyK7PMjs0= -github.com/hashicorp/raft-boltdb v0.0.0-20230125174641-2a8082862702 h1:RLKEcCuKcZ+qp2VlaaZsYZfLOmIiuJNpEi48Rl8u9cQ= -github.com/hashicorp/raft-boltdb/v2 v2.3.0 h1:fPpQR1iGEVYjZ2OELvUHX600VAK5qmdnDEv3eXOwZUA= -github.com/hashicorp/raft-boltdb/v2 v2.3.0/go.mod h1:YHukhB04ChJsLHLJEUD6vjFyLX2L3dsX3wPBZcX4tmc= -github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= -github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= -github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= -github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= -github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= -github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/iancoleman/strcase v0.3.0 h1:nTXanmYxhfFAMjZL34Ov6gkzEsSJZ5DbhxWjvSASxEI= +github.com/iancoleman/strcase v0.3.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho= github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= -github.com/mattn/go-colorable v0.1.12 h1:jF+Du6AlPIjs2BiUiQlKOX0rt3SujHxPnksPKZbaA40= -github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4= -github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= -github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9Y= -github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= -github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= -github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= -github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= -github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= -github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= -github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= -github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY= -github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= -github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU= +github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= -github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= -github.com/prometheus/client_golang v1.4.0/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU= -github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= -github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= -github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= -github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= -github.com/prometheus/common v0.9.1/go.mod h1:yhUN8i9wzaXS3w1O07YhxHEBxD+W35wd8bs7vj7HSQ4= -github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= -github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= -github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= -github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= -github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= -github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= -github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals= +github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= +github.com/rs/zerolog v1.32.0 h1:keLypqrlIjaFsbmJOBdB/qvyF8KEtCWHwobLp5l/mQ0= +github.com/rs/zerolog v1.32.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= github.com/yaitoo/async v1.0.4 h1:u+SWuJcSckgBOcMjMYz9IviojeCatDrdni3YNGLCiHY= github.com/yaitoo/async v1.0.4/go.mod h1:IpSO7Ei7AxiqLxFqDjN4rJaVlt8wm4ZxMXyyQaWmM1g= -go.etcd.io/bbolt v1.3.5 h1:XAzx9gjCb0Rxj7EoqcClPD1d5ZBxZJk0jbuoPHenBt0= -go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ= -golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= -golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= -golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= +github.com/yaitoo/sqle v1.3.1 h1:GYTAojaMyya/xcuM0jOQeiJztj09BWdqVYtTQvvrTmY= +github.com/yaitoo/sqle v1.3.1/go.mod h1:eAnLv2XhUUlIVFVfvK/kmcuzxUDHSVTv9OO8huO0WII= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= +golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= -gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/lease.go b/lease.go index 6b6d118..c5a64cd 100644 --- a/lease.go +++ b/lease.go @@ -1,6 +1,10 @@ package dlm -import "time" +import ( + "time" + + "github.com/yaitoo/sqle" +) // Lease lock period on the key type Lease struct { @@ -9,13 +13,12 @@ type Lease struct { Lessee string Since int64 - TTL time.Duration - Nonce uint64 + TTL sqle.Duration // Only available on mutex ExpiresOn time.Time `json:"-"` } func (l *Lease) IsLive() bool { - return time.Now().Before(time.Unix(l.Since, 0).Add(l.TTL)) + return time.Now().Before(time.Unix(l.Since, 0).Add(l.TTL.Duration())) } diff --git a/mutex.go b/mutex.go index 4a73b64..32844f9 100644 --- a/mutex.go +++ b/mutex.go @@ -3,6 +3,7 @@ package dlm import ( "context" "errors" + "math" "net/rpc" "strings" "sync" @@ -12,40 +13,38 @@ import ( ) func New(id, topic, key string, options ...MutexOption) *Mutex { - // id := os.Getenv("DLM-MUTEX-ID") - - // if id == "" { - // id, _ = uuid.GenerateUUID() - // if id == "" { - // id = fmt.Sprintf("%v-%v", os.Getegid(), time.Now().UnixNano()) - // } - // } + m := &Mutex{ id: id, topic: strings.ToLower(topic), key: strings.ToLower(key), done: make(chan struct{}), timeout: DefaultTimeout, + ttl: DefaultLeaseTerm, } for _, o := range options { o(m) } + m.consensus = int(math.Ceil(float64(len(m.peers)) / 2)) + return m } type Mutex struct { mu sync.RWMutex - id string - topic string - key string - dispatchers []string - timeout time.Duration + id string + topic string + key string + peers []string + timeout time.Duration + ttl time.Duration - cluster []*rpc.Client - done chan struct{} + consensus int + cluster []*rpc.Client + done chan struct{} lease Lease } @@ -53,7 +52,7 @@ type Mutex struct { func (m *Mutex) connect(ctx context.Context) error { if m.cluster == nil { a := async.New[*rpc.Client]() - for _, d := range m.dispatchers { + for _, d := range m.peers { a.Add(func(addr string) func(context.Context) (*rpc.Client, error) { return func(ctx context.Context) (*rpc.Client, error) { return connect(ctx, addr, m.timeout) @@ -62,7 +61,7 @@ func (m *Mutex) connect(ctx context.Context) error { } cluster, _, err := a.Wait(ctx) - if len(cluster) > 0 { + if len(cluster) >= m.consensus { m.cluster = cluster return nil } @@ -73,7 +72,7 @@ func (m *Mutex) connect(ctx context.Context) error { return nil } -func (m *Mutex) Lock(ctx context.Context) (context.Context, error) { +func (m *Mutex) Lock(ctx context.Context) (context.Context, context.CancelFunc, error) { m.mu.Lock() defer m.mu.Unlock() @@ -85,7 +84,7 @@ func (m *Mutex) Lock(ctx context.Context) (context.Context, error) { err := m.connect(ctx) if err != nil { - return nil, err + return nil, nil, err } for _, c := range m.cluster { @@ -99,25 +98,28 @@ func (m *Mutex) Lock(ctx context.Context) (context.Context, error) { } start := time.Now() - t, _, err := a.WaitAny(ctx) + result, _, err := a.WaitN(ctx, m.consensus) + if err != nil { - return nil, err + return nil, nil, err } + + t := result[0] now := time.Now() - t.ExpiresOn = now.Add(t.TTL - time.Until(start)) + t.ExpiresOn = now.Add(t.TTL.Duration() - time.Until(start)) - if !now.After(t.ExpiresOn) { - return nil, ErrExpiredLease + if !now.Before(t.ExpiresOn) { + return nil, nil, ErrExpiredLease } m.lease = t - wCtx, wCancel := context.WithCancel(context.Background()) + statusCtx, statusCancel := context.WithCancel(context.Background()) - go m.keepalive(wCtx, wCancel) - go m.waitExpires(wCtx, wCancel) + go m.keepalive(statusCtx, statusCancel) + go m.waitExpires(statusCtx, statusCancel) - return ctx, nil + return statusCtx, statusCancel, nil } @@ -142,7 +144,7 @@ func (m *Mutex) Unlock(ctx context.Context) error { } m.done <- struct{}{} - _, _, err := a.WaitAny(ctx) + _, _, err := a.WaitN(ctx, m.consensus) if err != nil { return err } @@ -154,6 +156,7 @@ func (m *Mutex) createRequest() LockRequest { ID: m.id, Topic: m.topic, Key: m.key, + TTL: m.ttl, } } @@ -179,22 +182,19 @@ func (m *Mutex) Renew(ctx context.Context) error { defer cancel() start := time.Now() - t, _, err := a.WaitAny(ctx) + result, _, err := a.WaitN(ctx, m.consensus) if err != nil { return err } now := time.Now() - t.ExpiresOn = now.Add(t.TTL - time.Until(start)) + t := result[0] + t.ExpiresOn = now.Add(t.TTL.Duration() - time.Until(start)) if !now.After(t.ExpiresOn) { return ErrExpiredLease } - if t.Nonce != m.lease.Nonce { - return ErrStaleNonce - } - m.lease = t return nil } @@ -207,10 +207,15 @@ func (m *Mutex) waitExpires(ctx context.Context, cancel context.CancelFunc) { expiresOn = m.lease.ExpiresOn m.mu.RUnlock() - <-time.After(time.Until(expiresOn)) - - if !expiresOn.Before(expiresOn) { + select { + case <-m.done: return + case <-ctx.Done(): + return + case <-time.After(time.Until(expiresOn)): + if !expiresOn.Before(expiresOn) { + return + } } } } @@ -228,6 +233,8 @@ func (m *Mutex) keepalive(ctx context.Context, cancel context.CancelFunc) { select { case <-m.done: return + case <-ctx.Done(): + return case <-time.After(1 * time.Second): // lease already expires if !expiresOn.Before(expiresOn) { @@ -235,7 +242,7 @@ func (m *Mutex) keepalive(ctx context.Context, cancel context.CancelFunc) { } err = m.Renew(context.Background()) - if errors.Is(err, ErrStaleNonce) || errors.Is(err, ErrExpiredLease) { + if errors.Is(err, ErrExpiredLease) { return } } diff --git a/mutex_option.go b/mutex_option.go index 26dfd4e..2e147b4 100644 --- a/mutex_option.go +++ b/mutex_option.go @@ -4,9 +4,15 @@ import "time" type MutexOption func(m *Mutex) -func WithDispatcher(dispatchers ...string) MutexOption { +func WithPeers(peers ...string) MutexOption { return func(m *Mutex) { - m.dispatchers = dispatchers + m.peers = peers + } +} + +func WithTTL(d time.Duration) MutexOption { + return func(m *Mutex) { + m.ttl = d } } diff --git a/mutext_test.go b/mutext_test.go index f59e52a..3735251 100644 --- a/mutext_test.go +++ b/mutext_test.go @@ -1,7 +1,100 @@ package dlm -import "testing" +import ( + "context" + "testing" + "time" -func TestMutex(t *testing.T) { + "github.com/stretchr/testify/require" + "github.com/yaitoo/async" + "github.com/yaitoo/sqle" +) + +func createCluster(ctx context.Context, num int) ([]string, []*Node, []func(), error) { + var peers []string + var nodes []*Node + var clean []func() + + for i := 0; i < num; i++ { + db, fn, err := createSqlite3() + if err != nil { + return nil, nil, clean, err + } + clean = append(clean, fn) + n := NewNode(getFreeAddr(), sqle.Open(db)) + err = n.Start(ctx) + if err != nil { + return nil, nil, clean, err + } + peers = append(peers, n.addr) + nodes = append(nodes, n) + } + + return peers, nodes, clean, nil +} + +func TestLock(t *testing.T) { + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + peers, nodes, clean, err := createCluster(ctx, 5) + + require.NoError(t, err) + + defer func() { + for _, c := range clean { + c() + } + }() + + tests := []struct { + name string + run func(*require.Assertions) + }{ + { + name: "lock_should_work", + run: func(r *require.Assertions) { + m := New("lock_should_work", "wallet", "lock_should_work", WithPeers(peers...), WithTTL(10*time.Second)) + _, cancel, err := m.Lock(context.TODO()) + defer cancel() + r.NoError(err) + r.Equal(10*time.Second, m.lease.TTL.Duration()) + r.Equal("wallet", m.lease.Topic) + r.Equal("lock_should_work", m.lease.Key) + + m2 := New("lock_should_work_2", "wallet", "lock_should_work", WithPeers(peers...)) + _, _, err = m2.Lock(context.TODO()) + + r.Error(err, async.ErrTooLessDone) + }, + }, + { + name: "lock_should_work_even_minority_nodes_are_down", + run: func(r *require.Assertions) { + m := New("lock_should_work", "wallet", "minority_nodes_are_down", WithPeers(peers...), WithTTL(10*time.Second)) + + nodes[0].Stop() + nodes[1].Stop() + + _, cancel, err := m.Lock(context.TODO()) + defer cancel() + r.NoError(err) + r.Equal(10*time.Second, m.lease.TTL.Duration()) + r.Equal("wallet", m.lease.Topic) + r.Equal("minority_nodes_are_down", m.lease.Key) + + m2 := New("lock_should_work_2", "wallet", "minority_nodes_are_down", WithPeers(peers...)) + _, _, err = m2.Lock(context.TODO()) + + r.Error(err, async.ErrTooLessDone) + + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + test.run(require.New(t)) + }) + } } diff --git a/node.go b/node.go index e0be3e5..60848cf 100644 --- a/node.go +++ b/node.go @@ -1,132 +1,36 @@ package dlm import ( - "encoding/json" - "io" + "log/slog" "net/rpc" - "os" "sync" - "time" - "github.com/hashicorp/raft" + "github.com/yaitoo/sqle" ) -func NewNode(addr string, options ...NodeOption) *Node { +func NewNode(addr string, db *sqle.DB, options ...NodeOption) *Node { n := &Node{ - addr: addr, - m: make(map[string][]byte), + addr: addr, + db: db, + frozen: make(map[string]struct{}), + logger: slog.Default(), + close: make(chan struct{}), } - n.logger = os.Stderr - for _, o := range options { o(n) } - if n.raftID == "" { - n.raftID = n.raftAddr - } - return n } type Node struct { - mu sync.Mutex - - raftID string - raftAddr string - - dir string - logger io.Writer + mu sync.Mutex + db *sqle.DB + logger *slog.Logger + frozen map[string]struct{} addr string server *rpc.Server - - m map[string][]byte // The key-value store for the system. - raft *raft.Raft // The consensus mechanism -} - -func (n *Node) isLeader() bool { - return n.raft.State() == raft.Leader -} - -func (n *Node) getLease(dbKey string) (Lease, error) { - var l Lease - buf, ok := n.m[dbKey] - if !ok { - return l, ErrNoLease - } - - err := json.Unmarshal(buf, &l) - if err != nil { - return l, err - } - return l, nil -} - -func (n *Node) getTerms(topic string) (time.Duration, error) { - dbKey := TopicTerms + topic - var ttl time.Duration - - buf, ok := n.m[dbKey] - if ok { - err := json.Unmarshal(buf, &ttl) - if err != nil { - return 0, err - } - } else { - ttl = DefaultLeaseTerm - } - - if ttl < 0 { - return 0, ErrFrozenTopic - } - - return ttl, nil -} - -func (n *Node) applyCmdLease(name, dbKey string, l Lease) error { - buf, err := json.Marshal(l) - if err != nil { - return err - } - cmd := cmd{ - Name: name, - Key: dbKey, - Value: buf, - } - buf, _ = json.Marshal(cmd) - - f := n.raft.Apply(buf, DefaultTimeout) - - return f.Error() -} - -func (n *Node) applyCmdTerms(name, dbKey string, ttl time.Duration) error { - buf, err := json.Marshal(ttl) - if err != nil { - return err - } - cmd := cmd{ - Name: name, - Key: dbKey, - Value: buf, - } - buf, _ = json.Marshal(cmd) - - f := n.raft.Apply(buf, DefaultTimeout) - - return f.Error() -} - -func (n *Node) applyCmdRemove(dbKey string) error { - cmd := cmd{ - Name: CmdRemove, - Key: dbKey, - } - buf, _ := json.Marshal(cmd) - - f := n.raft.Apply(buf, DefaultTimeout) - - return f.Error() + close chan struct{} } diff --git a/node_db.go b/node_db.go new file mode 100644 index 0000000..d26a7d5 --- /dev/null +++ b/node_db.go @@ -0,0 +1,83 @@ +package dlm + +import ( + "context" + "database/sql" + "errors" + "log/slog" + + "github.com/yaitoo/sqle" +) + +func (n *Node) createLease(l Lease) error { + cmd := sqle.New(). + Insert("dlm_lease"). + Set("topic", l.Topic). + Set("key", l.Key). + Set("lessee", l.Lessee). + Set("since", l.Since). + Set("ttl", l.TTL).End() + + _, err := n.db.ExecBuilder(context.Background(), cmd) + if err != nil { + n.logger.Warn("dlm: create lease", slog.String("err", err.Error())) + return ErrBadDatabase + } + + return nil +} + +func (n *Node) updateLease(l Lease) error { + cmd := sqle.New(). + Update("dlm_lease"). + Set("topic", l.Topic). + Set("key", l.Key). + Set("lessee", l.Lessee). + Set("since", l.Since). + Set("ttl", l.TTL). + Where("topic = {topic} and key ={key}").End() + + _, err := n.db.ExecBuilder(context.Background(), cmd) + if err != nil { + n.logger.Warn("dlm: update lease", slog.String("err", err.Error())) + return ErrBadDatabase + } + + return nil +} + +func (n *Node) getLease(topic, key string) (Lease, error) { + var l Lease + + cmd := sqle.New(). + Select("dlm_lease"). + Where("topic = {topic} AND key = {key}"). + Param("topic", topic). + Param("key", key) + + err := n.db.QueryRowBuilder(context.Background(), cmd).Bind(&l) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return l, ErrNoLease + } + + n.logger.Warn("dlm: get lease", slog.String("err", err.Error())) + return l, ErrBadDatabase + } + + return l, nil +} + +func (n *Node) deleteLease(topic, key string) error { + _, err := n.db.ExecBuilder(context.Background(), sqle.New(). + Delete("dlm_lease"). + Where("topic = {topic} AND key = {key}"). + Param("topic", topic). + Param("key", key)) + if err != nil { + n.logger.Warn("dlm: get lease", slog.String("err", err.Error())) + return ErrBadDatabase + } + + return nil +} diff --git a/node_fsm.go b/node_fsm.go deleted file mode 100644 index abfa425..0000000 --- a/node_fsm.go +++ /dev/null @@ -1,67 +0,0 @@ -package dlm - -import ( - "encoding/json" - "fmt" - "io" - - "github.com/hashicorp/raft" -) - -// Apply applies a Raft log entry to the key-value store. -func (n *Node) Apply(l *raft.Log) interface{} { - if !n.isLeader() { - n.mu.Lock() - defer n.mu.Unlock() - } - - var c cmd - if err := json.Unmarshal(l.Data, &c); err != nil { - panic(fmt.Sprintf("failed to unmarshal command: %s", err.Error())) - } - - switch c.Name { - case CmdSet: - return n.applySet(c.Key, c.Value) - case CmdRemove: - return n.applyDelete(c.Key) - default: - panic(fmt.Sprintf("unrecognized command name: %s", c.Name)) - } -} - -// Snapshot returns a snapshot of the key-value store. -func (n *Node) Snapshot() (raft.FSMSnapshot, error) { - n.mu.Lock() - defer n.mu.Unlock() - - // Clone the map. - o := make(map[string][]byte) - for k, v := range n.m { - o[k] = v - } - return &snapshot{store: o}, nil -} - -// Restore stores the key-value store to a previous state. -func (n *Node) Restore(rc io.ReadCloser) error { - o := make(map[string][]byte) - if err := json.NewDecoder(rc).Decode(&o); err != nil { - return err - } - - // Set the state from the snapshot, no lock required according to - // Hashicorp docs. - n.m = o - return nil -} - -func (n *Node) applySet(key string, value []byte) interface{} { - n.m[key] = value - return nil -} - -func (n *Node) applyDelete(key string) interface{} { - delete(n.m, key) - return nil -} diff --git a/node_option.go b/node_option.go index 599fcf2..972dca9 100644 --- a/node_option.go +++ b/node_option.go @@ -1,26 +1,13 @@ package dlm import ( - "io" + "log/slog" ) type NodeOption func(n *Node) -func WithRaft(id string, addr string) NodeOption { +func WithLogger(l *slog.Logger) NodeOption { return func(n *Node) { - n.raftID = id - n.raftAddr = addr - } -} - -func WithDataDir(dir string) NodeOption { - return func(n *Node) { - n.dir = dir - } -} - -func WithLogger(w io.Writer) NodeOption { - return func(n *Node) { - n.logger = w + n.logger = l } } diff --git a/node_rpc.go b/node_rpc.go index 4fa117e..2c1e640 100644 --- a/node_rpc.go +++ b/node_rpc.go @@ -2,37 +2,26 @@ package dlm import ( "errors" - "fmt" - "strings" "time" - "github.com/hashicorp/raft" + "github.com/yaitoo/sqle" ) func (n *Node) NewLock(req LockRequest, t *Lease) error { - if !n.isLeader() { - return ErrNotRaftLeader - } - - if strings.HasPrefix(req.Topic, "@") { // @ is reversed for LeaseTerms setting - return ErrInvalidTopic - } - n.mu.Lock() defer n.mu.Unlock() - ttl, err := n.getTerms(req.Topic) + _, ok := n.frozen[req.Topic] - if err != nil { - return err + if ok { + return ErrFrozenTopic } - dbKey := req.Topic + ":" + req.Key - lease, err := n.getLease(dbKey) + lease, err := n.getLease(req.Topic, req.Key) if err == nil { // exits a lease - if lease.Lessee != req.ID && lease.IsLive() { //someone else's lease is still live - return ErrAlreadyLocked + if lease.Lessee != req.ID && lease.IsLive() { // someone else's lease is still live + return ErrLeaseExists } } else { if !errors.Is(err, ErrNoLease) { @@ -45,12 +34,15 @@ func (n *Node) NewLock(req LockRequest, t *Lease) error { Lessee: req.ID, Topic: req.Topic, Key: req.Key, - TTL: ttl, + TTL: sqle.Duration(req.TTL), Since: time.Now().Unix(), - Nonce: n.raft.CommitIndex(), + } + if errors.Is(err, ErrNoLease) { + err = n.createLease(lease) + } else { + err = n.updateLease(lease) } - err = n.applyCmdLease(CmdSet, dbKey, lease) if err != nil { return err } @@ -63,21 +55,15 @@ func (n *Node) NewLock(req LockRequest, t *Lease) error { } func (n *Node) RenewLock(req LockRequest, t *Lease) error { - if !n.isLeader() { - return ErrNotRaftLeader - } - n.mu.Lock() defer n.mu.Unlock() - ttl, err := n.getTerms(req.Topic) - - if err != nil { - return err + _, ok := n.frozen[req.Topic] + if ok { + return ErrFrozenTopic } - dbKey := req.Topic + ":" + req.Key - lease, err := n.getLease(dbKey) + lease, err := n.getLease(req.Topic, req.Key) if err != nil { return err @@ -92,9 +78,9 @@ func (n *Node) RenewLock(req LockRequest, t *Lease) error { } lease.Since = time.Now().Unix() - lease.TTL = ttl + lease.TTL = sqle.Duration(req.TTL) - err = n.applyCmdLease(CmdSet, dbKey, lease) + err = n.updateLease(lease) if err != nil { return err } @@ -105,16 +91,10 @@ func (n *Node) RenewLock(req LockRequest, t *Lease) error { } func (n *Node) ReleaseLock(req LockRequest, ok *bool) error { - if !n.isLeader() { - return ErrNotRaftLeader - } - n.mu.Lock() defer n.mu.Unlock() - dbKey := req.Topic + ":" + req.Key - - lease, err := n.getLease(dbKey) + lease, err := n.getLease(req.Topic, req.Key) if err != nil { return err } @@ -123,7 +103,7 @@ func (n *Node) ReleaseLock(req LockRequest, ok *bool) error { return ErrNotYourLease } - err = n.applyCmdRemove(dbKey) + err = n.deleteLease(req.Topic, req.Key) if err != nil { return err @@ -134,71 +114,20 @@ func (n *Node) ReleaseLock(req LockRequest, ok *bool) error { return nil } -func (n *Node) SetTerms(req TermsRequest, ok *bool) error { - if !n.isLeader() { - return ErrNotRaftLeader - } +func (n *Node) Freeze(topic string, ok *bool) error { n.mu.Lock() defer n.mu.Unlock() + n.frozen[topic] = struct{}{} + *ok = true - return n.applyCmdTerms(CmdSet, TopicTerms+req.Topic, req.TTL) + return nil } -func (n *Node) RemoveTerms(req TermsRequest, ok *bool) error { - if !n.isLeader() { - return ErrNotRaftLeader - } +func (n *Node) Reset(topic string, ok *bool) error { n.mu.Lock() defer n.mu.Unlock() - err := n.applyCmdRemove(TopicTerms + req.Topic) - - if err != nil { - return err - } - - *ok = true - return nil -} - -// Join joins a new node, identified by id and located at addr, to this store. -// The node must be ready to respond to Raft communications at that address. -func (n *Node) Join(p Peer, ok *bool) error { - if !n.isLeader() { - return ErrNotRaftLeader - } - - nodeID := raft.ServerID(p.RaftID) - addr := raft.ServerAddress(p.RaftAddr) - - configFuture := n.raft.GetConfiguration() - if err := configFuture.Error(); err != nil { - return err - } - - for _, srv := range configFuture.Configuration().Servers { - // If a node already exists with either the joining node's ID or address, - // that node may need to be removed from the config first. - if srv.ID == nodeID || srv.Address == addr { - // However if *both* the ID and the address are the same, then nothing -- not even - // a join operation -- is needed. - if srv.Address == addr && srv.ID == nodeID { - Logger.Info("dlm: node %s at %s already member of cluster, ignoring join request", p.RaftID, p.RaftAddr) - *ok = true - return nil - } - - future := n.raft.RemoveServer(srv.ID, 0, 0) - if err := future.Error(); err != nil { - return fmt.Errorf("error removing existing node %s at %s: %s", nodeID, addr, err) - } - } - } - - f := n.raft.AddVoter(nodeID, addr, 0, 0) - if f.Error() != nil { - return f.Error() - } + delete(n.frozen, topic) *ok = true return nil diff --git a/node_svc.go b/node_svc.go index 10804b2..6664fd2 100644 --- a/node_svc.go +++ b/node_svc.go @@ -6,97 +6,38 @@ import ( "log/slog" "net" "net/rpc" - "os" - "path/filepath" - "time" - - "github.com/hashicorp/raft" - raftboltdb "github.com/hashicorp/raft-boltdb/v2" ) -// Start starts the node +// Start start the node and its RPC service func (n *Node) Start(ctx context.Context) error { - - // Setup Raft configuration. - config := raft.DefaultConfig() - config.LocalID = raft.ServerID(n.raftID) - - // Setup Raft communication. - addr, err := net.ResolveTCPAddr("tcp", n.raftAddr) - if err != nil { - return err - } - transport, err := raft.NewTCPTransport(n.raftAddr, addr, 3, 10*time.Second, n.logger) + l, err := net.Listen("tcp", n.addr) if err != nil { return err } - // Create the snapshot store. This allows the Raft to truncate the log. - var snapshots raft.SnapshotStore - // Create the log store and stable store. - var logStore raft.LogStore - var stableStore raft.StableStore - if n.dir == "" { - snapshots = raft.NewInmemSnapshotStore() - logStore = raft.NewInmemStore() - stableStore = raft.NewInmemStore() - } else { - _, err := os.Stat(n.dir) - if errors.Is(err, os.ErrNotExist) { - err = os.MkdirAll(n.dir, 0700) - if err != nil { - return err - } - } - - snapshots, err = raft.NewFileSnapshotStore(n.dir, DefaultRetainSnapshotCount, n.logger) - if err != nil { - return err - } - - path := filepath.Join(n.dir, "raft_"+n.raftID+".db") - - boltDB, err := raftboltdb.New(raftboltdb.Options{ - Path: path, - }) - if err != nil { - return err - } - logStore = boltDB - stableStore = boltDB - } - - // Instantiate the Raft systems. - ra, err := raft.NewRaft(config, n, logStore, stableStore, snapshots, transport) + _, err = n.db.ExecContext(ctx, CreateTableLease) if err != nil { return err } - n.raft = ra - - go func() { - <-ctx.Done() - - n.raft.Shutdown() - }() - return nil -} - -// Serve start the node's RPC service -func (n *Node) Serve(ctx context.Context) error { - l, err := net.Listen("tcp", n.addr) + _, err = n.db.ExecContext(ctx, CreateTableTopic) if err != nil { return err } n.server = rpc.NewServer() - go n.waitClose(ctx, l) go n.waitRequest(l) return n.server.RegisterName("dlm", n) } +// Stop stop the node and its RPC service +func (n *Node) Stop() { + n.close <- struct{}{} + n.logger.Info("dlm: node stopped") +} + func (n *Node) waitRequest(l net.Listener) { for { conn, err := l.Accept() @@ -107,13 +48,18 @@ func (n *Node) waitRequest(l net.Listener) { return } - Logger.Warn("wait request", slog.String("err", err.Error()), slog.String("node_id", n.raftID)) + n.logger.Warn("dlm: wait request", slog.String("err", err.Error()), slog.String("addr", n.addr)) } } } func (n *Node) waitClose(ctx context.Context, l net.Listener) { - <-ctx.Done() + + select { + case <-n.close: + case <-ctx.Done(): + } + l.Close() } diff --git a/node_test.go b/node_test.go index d120a39..cbd0c17 100644 --- a/node_test.go +++ b/node_test.go @@ -2,41 +2,65 @@ package dlm import ( "context" + "database/sql" + "net" + "os" + "strconv" "testing" "time" + _ "github.com/mattn/go-sqlite3" "github.com/stretchr/testify/require" + "github.com/yaitoo/sqle" ) -func TestNode(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - n := NewNode(getFreeAddr(), WithRaft("1", getFreeAddr())) - err := n.Start(ctx) - require.NoError(t, err) +func getFreeAddr() string { + l, err := net.Listen("tcp", ":0") + + if err != nil { + return "" + } + + defer l.Close() + return "127.0.0.1:" + strconv.Itoa(l.Addr().(*net.TCPAddr).Port) +} - // err = n.Serve(ctx) - // require.NoError(t, err) +func createSqlite3() (*sql.DB, func(), error) { + f, err := os.CreateTemp(".", "*.db") + f.Close() - // n2 := NewNode(getFreeAddr(), WithRaft("2", getFreeAddr())) - // err = n2.Start(ctx) - // require.NoError(t, err) + clean := func() { + os.Remove(f.Name()) //nolint + } + + if err != nil { + return nil, clean, err + } - // err = n2.Serve(ctx) - // require.NoError(t, err) + db, err := sql.Open("sqlite3", f.Name()) - walletTerms := 5 * time.Second - userTerms := 3 * time.Second + if err != nil { + return nil, clean, err + } + + return db, clean, nil + +} - c := &Cluster{} - err = c.Start(n, WithLeaseTerms(map[string]time.Duration{ - "wallet": walletTerms, - "user": userTerms, - })) +func TestLease(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + db, clean, err := createSqlite3() require.NoError(t, err) + defer clean() - // err = c.Join(ctx, Peer{Addr: n2.addr, RaftID: n2.raftID, RaftAddr: n2.raftAddr}) - // require.NoError(t, err) + n := NewNode(getFreeAddr(), sqle.Open(db)) + err = n.Start(ctx) + require.NoError(t, err) + + walletTerms := sqle.Duration(5 * time.Second) + userTerms := sqle.Duration(3 * time.Second) tests := []struct { name string @@ -46,15 +70,18 @@ func TestNode(t *testing.T) { name: "new_lock_should_work", run: func(re *require.Assertions) { var expected Lease - err := n.NewLock(LockRequest{ + req := LockRequest{ ID: "new_lock", Topic: "wallet", Key: "new_lock", - }, &expected) + TTL: walletTerms.Duration(), + } + + err := n.NewLock(req, &expected) re.NoError(err) - actual, err := n.getLease("wallet:new_lock") + actual, err := n.getLease("wallet", "new_lock") re.NoError(err) re.Equal(expected, actual) re.Equal(walletTerms, actual.TTL) @@ -68,20 +95,21 @@ func TestNode(t *testing.T) { Key: "new_lock", }, &expected) - re.ErrorIs(err, ErrAlreadyLocked) + re.ErrorIs(err, ErrLeaseExists) err = n.NewLock(LockRequest{ ID: "new_lock_3", Topic: "no_exists_topic", Key: "new_lock", + TTL: DefaultLeaseTerm, }, &expected) re.NoError(err) - actual, err = n.getLease("no_exists_topic:new_lock") + actual, err = n.getLease("no_exists_topic", "new_lock") re.NoError(err) - re.Equal(DefaultLeaseTerm, actual.TTL) + re.Equal(DefaultLeaseTerm, actual.TTL.Duration()) re.Equal("new_lock_3", actual.Lessee) re.Equal("no_exists_topic", actual.Topic) re.Equal("new_lock", actual.Key) @@ -95,11 +123,12 @@ func TestNode(t *testing.T) { ID: "renew_lock", Topic: "user", Key: "renew_lock", + TTL: userTerms.Duration(), }, &expected) re.NoError(err) - actual, err := n.getLease("user:renew_lock") + actual, err := n.getLease("user", "renew_lock") re.NoError(err) re.Equal(expected, actual) re.Equal(userTerms, actual.TTL) @@ -111,6 +140,7 @@ func TestNode(t *testing.T) { ID: "renew_lock_3", Topic: "user", Key: "renew_lock", + TTL: userTerms.Duration(), }, &expected) re.ErrorIs(err, ErrNotYourLease) @@ -119,11 +149,12 @@ func TestNode(t *testing.T) { ID: "renew_lock", Topic: "user", Key: "renew_lock", + TTL: userTerms.Duration(), }, &expected) re.NoError(err) - actual, err = n.getLease("user:renew_lock") + actual, err = n.getLease("user", "renew_lock") re.NoError(err) re.Equal(expected, actual) re.Equal(userTerms, actual.TTL) @@ -139,7 +170,7 @@ func TestNode(t *testing.T) { re.ErrorIs(err, ErrNoLease) - time.Sleep(userTerms) + time.Sleep(userTerms.Duration()) err = n.RenewLock(LockRequest{ ID: "renew_lock", Topic: "user", @@ -158,11 +189,12 @@ func TestNode(t *testing.T) { ID: "release_lock", Topic: "wallet", Key: "release_lock", + TTL: walletTerms.Duration(), }, &expected) re.NoError(err) - actual, err := n.getLease("wallet:release_lock") + actual, err := n.getLease("wallet", "release_lock") re.NoError(err) re.Equal(expected, actual) re.Equal(walletTerms, actual.TTL) @@ -186,44 +218,11 @@ func TestNode(t *testing.T) { re.NoError(err) - actual, err = n.getLease("wallet:release_lock") + actual, err = n.getLease("wallet", "release_lock") re.ErrorIs(err, ErrNoLease) }, }, - { - name: "terms_should_work", - run: func(re *require.Assertions) { - orderTerms := 4 * time.Second - var ok bool - - actual, err := n.getTerms("order") - re.NoError(err) - re.Equal(DefaultLeaseTerm, actual) - - err = n.SetTerms(TermsRequest{ - Topic: "order", - TTL: orderTerms, - }, &ok) - - re.NoError(err) - - actual, err = n.getTerms("order") - re.NoError(err) - re.Equal(orderTerms, actual) - - err = n.RemoveTerms(TermsRequest{ - Topic: "order", - }, &ok) - - re.NoError(err) - - actual, err = n.getTerms("order") - re.NoError(err) - re.Equal(DefaultLeaseTerm, actual) - - }, - }, } for _, test := range tests { @@ -233,129 +232,146 @@ func TestNode(t *testing.T) { } } -func TestLeader(t *testing.T) { +func TestTopic(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - n1 := NewNode(getFreeAddr(), WithRaft("1", getFreeAddr())) - err := n1.Start(ctx) - require.NoError(t, err) - err = n1.Serve(ctx) + db, clean, err := createSqlite3() require.NoError(t, err) + defer clean() - n2 := NewNode(getFreeAddr(), WithRaft("2", getFreeAddr())) - err = n2.Start(ctx) + n := NewNode(getFreeAddr(), sqle.Open(db)) + err = n.Start(ctx) require.NoError(t, err) - err = n2.Serve(ctx) - require.NoError(t, err) - - c := &Cluster{} - err = c.Start(n1) - require.NoError(t, err) - - err = c.Join(ctx, Peer{Addr: n2.addr, RaftID: n2.raftID, RaftAddr: n2.raftAddr}) - require.NoError(t, err) - - nodes := map[string]*Node{ - "1": n1, - "2": n2, - } + terms := sqle.Duration(5 * time.Second) tests := []struct { name string run func(*require.Assertions) }{ { - name: "leader_should_work", + name: "freeze_should_work", run: func(re *require.Assertions) { var expected Lease - _, id := n1.raft.LeaderWithID() - n := nodes[string(id)] - err := n.NewLock(LockRequest{ - ID: "leader", - Topic: "wallet", - Key: "leader", + ID: "freeze", + Topic: "freeze", + Key: "freeze", + TTL: terms.Duration(), }, &expected) re.NoError(err) - err = n.RenewLock(LockRequest{ - ID: "leader", - Topic: "wallet", - Key: "leader", - }, &expected) + actual, err := n.getLease("freeze", "freeze") re.NoError(err) + re.Equal(expected, actual) + re.Equal(terms, actual.TTL) + re.Equal("freeze", actual.Lessee) + re.Equal("freeze", actual.Topic) + re.Equal("freeze", actual.Key) + var ok bool - err = n.ReleaseLock(LockRequest{ - ID: "leader", - Topic: "wallet", - Key: "leader", - }, &ok) + err = n.Freeze("freeze", &ok) re.NoError(err) - err = n.SetTerms(TermsRequest{ - Topic: "leader", - TTL: DefaultLeaseTerm, - }, &ok) + err = n.RenewLock(LockRequest{ + ID: "freeze", + Topic: "freeze", + Key: "freeze", + }, &expected) + re.ErrorIs(err, ErrFrozenTopic) + err = n.NewLock(LockRequest{ + ID: "freeze", + Topic: "freeze", + Key: "freeze_2", + }, &expected) + + re.ErrorIs(err, ErrFrozenTopic) + + err = n.Reset("freeze", &ok) re.NoError(err) - err = n.RemoveTerms(TermsRequest{ - Topic: "leader", - }, &ok) + err = n.NewLock(LockRequest{ + ID: "freeze", + Topic: "freeze", + Key: "freeze_3", + TTL: terms.Duration(), + }, &expected) re.NoError(err) + + actual, err = n.getLease("freeze", "freeze_3") + re.NoError(err) + + re.Equal(terms, actual.TTL) + re.Equal("freeze", actual.Lessee) + re.Equal("freeze", actual.Topic) + re.Equal("freeze_3", actual.Key) }, }, { - name: "not_leader_should_not_work", + name: "reset_should_work", run: func(re *require.Assertions) { var expected Lease - _, id := n1.raft.LeaderWithID() - var n *Node - if id == "1" { - n = n2 - } else { - n = n1 - } + var ok bool + err = n.Freeze("reset", &ok) + re.NoError(err) - err := n.NewLock(LockRequest{ - ID: "follower", - Topic: "wallet", - Key: "follower", + err = n.NewLock(LockRequest{ + ID: "reset", + Topic: "reset", + Key: "reset", }, &expected) - re.ErrorIs(err, ErrNotRaftLeader) + re.ErrorIs(err, ErrFrozenTopic) err = n.RenewLock(LockRequest{ - ID: "follower", - Topic: "wallet", - Key: "follower", + ID: "reset", + Topic: "reset", + Key: "reset", }, &expected) - re.ErrorIs(err, ErrNotRaftLeader) + re.ErrorIs(err, ErrFrozenTopic) - var ok bool - err = n.ReleaseLock(LockRequest{ - ID: "follower", - Topic: "wallet", - Key: "follower", - }, &ok) - re.ErrorIs(err, ErrNotRaftLeader) + err = n.Reset("reset", &ok) + re.NoError(err) - err = n.SetTerms(TermsRequest{ - Topic: "follower", - TTL: DefaultLeaseTerm, - }, &ok) - re.ErrorIs(err, ErrNotRaftLeader) + err = n.NewLock(LockRequest{ + ID: "reset", + Topic: "reset", + Key: "reset", + TTL: terms.Duration(), + }, &expected) - err = n.RemoveTerms(TermsRequest{ - Topic: "follower", - }, &ok) - re.ErrorIs(err, ErrNotRaftLeader) + re.NoError(err) + + actual, err := n.getLease("reset", "reset") + re.NoError(err) + + re.Equal(terms, actual.TTL) + re.Equal("reset", actual.Lessee) + re.Equal("reset", actual.Topic) + re.Equal("reset", actual.Key) + + err = n.RenewLock(LockRequest{ + ID: "reset", + Topic: "reset", + Key: "reset", + TTL: 10 * time.Second, + }, &expected) + + re.NoError(err) + + actual, err = n.getLease("reset", "reset") + re.NoError(err) + + re.Equal(10*time.Second, actual.TTL.Duration()) + re.Equal("reset", actual.Lessee) + re.Equal("reset", actual.Topic) + re.Equal("reset", actual.Key) }, }, diff --git a/request.go b/request.go index aad2432..fd8b8f8 100644 --- a/request.go +++ b/request.go @@ -6,9 +6,5 @@ type LockRequest struct { ID string Topic string Key string -} - -type TermsRequest struct { - Topic string TTL time.Duration } diff --git a/snapshot.go b/snapshot.go deleted file mode 100644 index 01c76b4..0000000 --- a/snapshot.go +++ /dev/null @@ -1,40 +0,0 @@ -package dlm - -import ( - "encoding/json" - "log/slog" - - "github.com/hashicorp/raft" -) - -type snapshot struct { - store map[string][]byte -} - -func (s *snapshot) Persist(sink raft.SnapshotSink) error { - err := func() error { - // Encode data. - b, err := json.Marshal(s.store) - if err != nil { - return err - } - - // Write data to sink. - if _, err := sink.Write(b); err != nil { - return err - } - - // Close the sink. - return sink.Close() - }() - - if err != nil { - if e := sink.Cancel(); e != nil { - Logger.Warn("cancel snapshot sink", slog.String("err", e.Error())) - } - } - - return err -} - -func (*snapshot) Release() {}