From aa2bd6679403ca43a0b990752e2e4d9cc0448629 Mon Sep 17 00:00:00 2001 From: Lz Date: Fri, 19 Apr 2024 23:36:55 +0800 Subject: [PATCH 1/9] fix(conn): fixed goroutine leak issue --- conn.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/conn.go b/conn.go index 79d6653..5e66880 100644 --- a/conn.go +++ b/conn.go @@ -5,6 +5,7 @@ import ( "errors" "net" "net/rpc" + "sync" "time" ) @@ -26,24 +27,24 @@ func connect(ctx context.Context, addr string, timeout time.Duration) (*rpc.Clie type Conn struct { ctx context.Context net.Conn + once sync.Once } -func (c *Conn) wait() { +func (c *Conn) waitContext() { // disabled deadline - c.Conn.SetReadDeadline(time.Time{}) // nolint: errcheck + c.Conn.SetDeadline(time.Time{}) // nolint: errcheck <-c.ctx.Done() err := c.ctx.Err() if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { - c.Conn.SetReadDeadline(time.Unix(1, 0)) // nolint: errcheck + c.Conn.SetDeadline(time.Unix(1, 0)) // nolint: errcheck } } func (c *Conn) Read(p []byte) (n int, err error) { - go c.wait() + go c.once.Do(c.waitContext) return c.Conn.Read(p) } func (c *Conn) Write(p []byte) (n int, err error) { - go c.wait() return c.Conn.Write(p) } From dd5ae21c0d492e44c85fb1a0a0aae0f723a10dbb Mon Sep 17 00:00:00 2001 From: Lz Date: Fri, 19 Apr 2024 23:51:12 +0800 Subject: [PATCH 2/9] fix(conn): fixed goroutine leak issue --- conn.go | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/conn.go b/conn.go index 5e66880..93fd469 100644 --- a/conn.go +++ b/conn.go @@ -5,7 +5,6 @@ import ( "errors" "net" "net/rpc" - "sync" "time" ) @@ -20,14 +19,22 @@ func connect(ctx context.Context, addr string, timeout time.Duration) (*rpc.Clie return nil, err } - return rpc.NewClient(&Conn{ctx: ctx, Conn: c}), nil + conn := &Conn{ + Conn: c, + } + + conn.ctx, conn.cancel = context.WithCancel(ctx) + + go conn.waitContext() + + return rpc.NewClient(conn), nil } // Conn wrap net.Conn with context support type Conn struct { - ctx context.Context + ctx context.Context + cancel context.CancelFunc net.Conn - once sync.Once } func (c *Conn) waitContext() { @@ -41,10 +48,14 @@ func (c *Conn) waitContext() { } func (c *Conn) Read(p []byte) (n int, err error) { - go c.once.Do(c.waitContext) return c.Conn.Read(p) } func (c *Conn) Write(p []byte) (n int, err error) { return c.Conn.Write(p) } + +func (c *Conn) Close() error { + defer c.cancel() + return c.Conn.Close() +} From 3e767f91bcdb52b27a27fb25949fad7321892adc Mon Sep 17 00:00:00 2001 From: Lz Date: Fri, 19 Apr 2024 23:55:35 +0800 Subject: [PATCH 3/9] fix(conn): close rpc client --- mutex.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/mutex.go b/mutex.go index f4334cd..f221597 100644 --- a/mutex.go +++ b/mutex.go @@ -88,6 +88,7 @@ func (m *Mutex) Lock(ctx context.Context) error { for _, c := range cluster { a.Add(func(c *rpc.Client) func(ctx context.Context) (Lease, error) { + defer c.Close() return func(ctx context.Context) (Lease, error) { var t Lease err := c.Call("dlm.NewLock", req, &t) @@ -130,6 +131,7 @@ func (m *Mutex) Renew(ctx context.Context) error { for _, c := range cluster { a.Add(func(c *rpc.Client) func(ctx context.Context) (Lease, error) { + defer c.Close() return func(ctx context.Context) (Lease, error) { var t Lease err := c.Call("dlm.RenewLock", req, &t) @@ -174,6 +176,7 @@ func (m *Mutex) Unlock(ctx context.Context) error { for _, c := range cluster { a.Add(func(c *rpc.Client) func(ctx context.Context) error { + defer c.Close() return func(ctx context.Context) error { var t bool err := c.Call("dlm.ReleaseLock", req, &t) @@ -213,6 +216,7 @@ func (m *Mutex) Freeze(ctx context.Context, topic string) error { for _, c := range cluster { a.Add(func(c *rpc.Client) func(ctx context.Context) error { + defer c.Close() return func(ctx context.Context) error { var ok bool return c.Call("dlm.Freeze", topic, &ok) @@ -247,6 +251,7 @@ func (m *Mutex) Reset(ctx context.Context, topic string) error { for _, c := range cluster { a.Add(func(c *rpc.Client) func(ctx context.Context) error { + defer c.Close() return func(ctx context.Context) error { var ok bool return c.Call("dlm.Reset", topic, &ok) From 0e60e75805068bbe9c825fa60cb80867a9540e60 Mon Sep 17 00:00:00 2001 From: Lz Date: Fri, 19 Apr 2024 23:57:53 +0800 Subject: [PATCH 4/9] fix(conn): close rpc client --- mutex.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/mutex.go b/mutex.go index f221597..f64c7a4 100644 --- a/mutex.go +++ b/mutex.go @@ -88,8 +88,8 @@ func (m *Mutex) Lock(ctx context.Context) error { for _, c := range cluster { a.Add(func(c *rpc.Client) func(ctx context.Context) (Lease, error) { - defer c.Close() return func(ctx context.Context) (Lease, error) { + defer c.Close() var t Lease err := c.Call("dlm.NewLock", req, &t) return t, err @@ -131,8 +131,8 @@ func (m *Mutex) Renew(ctx context.Context) error { for _, c := range cluster { a.Add(func(c *rpc.Client) func(ctx context.Context) (Lease, error) { - defer c.Close() return func(ctx context.Context) (Lease, error) { + defer c.Close() var t Lease err := c.Call("dlm.RenewLock", req, &t) if err != nil { @@ -176,8 +176,8 @@ func (m *Mutex) Unlock(ctx context.Context) error { for _, c := range cluster { a.Add(func(c *rpc.Client) func(ctx context.Context) error { - defer c.Close() return func(ctx context.Context) error { + defer c.Close() var t bool err := c.Call("dlm.ReleaseLock", req, &t) if err != nil { @@ -216,8 +216,8 @@ func (m *Mutex) Freeze(ctx context.Context, topic string) error { for _, c := range cluster { a.Add(func(c *rpc.Client) func(ctx context.Context) error { - defer c.Close() return func(ctx context.Context) error { + defer c.Close() var ok bool return c.Call("dlm.Freeze", topic, &ok) } @@ -251,8 +251,8 @@ func (m *Mutex) Reset(ctx context.Context, topic string) error { for _, c := range cluster { a.Add(func(c *rpc.Client) func(ctx context.Context) error { - defer c.Close() return func(ctx context.Context) error { + defer c.Close() var ok bool return c.Call("dlm.Reset", topic, &ok) } From 42f5a5a4382da21a35f806a34302d6a13a53cdc1 Mon Sep 17 00:00:00 2001 From: Lz Date: Sat, 20 Apr 2024 00:05:17 +0800 Subject: [PATCH 5/9] chore(code): removed unused import --- conn.go | 1 - 1 file changed, 1 deletion(-) diff --git a/conn.go b/conn.go index 9369b56..0bd0d74 100644 --- a/conn.go +++ b/conn.go @@ -5,7 +5,6 @@ import ( "errors" "net" "net/rpc" - "sync" "time" ) From eb0350f27189dd866d7a9162a480fdf46fa5ed77 Mon Sep 17 00:00:00 2001 From: Lz Date: Sat, 20 Apr 2024 17:55:13 +0800 Subject: [PATCH 6/9] fix(mutex): improved time.After performance --- dlm.go | 1 + go.mod | 2 +- go.sum | 4 ++-- mutex.go | 40 +++++++++++++++++++++++++--------------- mutex_option.go | 6 ++++++ mutext_test.go | 2 +- 6 files changed, 36 insertions(+), 19 deletions(-) diff --git a/dlm.go b/dlm.go index 24aca7f..7c1836a 100644 --- a/dlm.go +++ b/dlm.go @@ -20,6 +20,7 @@ var ( var ( DefaultTimeout = 3 * time.Second DefaultLeaseTerm = 5 * time.Second + DefaultKeepAlive = 1 * time.Second Logger = slog.Default() ) diff --git a/go.mod b/go.mod index 7560b9f..1e4a60e 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/mattn/go-sqlite3 v1.14.22 github.com/stretchr/testify v1.9.0 github.com/yaitoo/async v1.0.4 - github.com/yaitoo/sqle v1.3.2 + github.com/yaitoo/sqle v1.4.5-0.20240420095412-e5fa84d2bf7d ) require ( diff --git a/go.sum b/go.sum index a48a599..9f65df0 100644 --- a/go.sum +++ b/go.sum @@ -15,8 +15,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/yaitoo/async v1.0.4 h1:u+SWuJcSckgBOcMjMYz9IviojeCatDrdni3YNGLCiHY= github.com/yaitoo/async v1.0.4/go.mod h1:IpSO7Ei7AxiqLxFqDjN4rJaVlt8wm4ZxMXyyQaWmM1g= -github.com/yaitoo/sqle v1.3.2 h1:kuoAw2XPNvuPFJlUI1EdAknCajrf4aUNc4ns5TzPpXw= -github.com/yaitoo/sqle v1.3.2/go.mod h1:Bv1PPG6hYZP2In3WKN1dBqYNJiWP0ZSLs6uEkRo2c9M= +github.com/yaitoo/sqle v1.4.5-0.20240420095412-e5fa84d2bf7d h1:vx5TmDJim+D4janxO80zQSuG2aH5aOUuLx0OD5Qztnw= +github.com/yaitoo/sqle v1.4.5-0.20240420095412-e5fa84d2bf7d/go.mod h1:Bv1PPG6hYZP2In3WKN1dBqYNJiWP0ZSLs6uEkRo2c9M= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/mutex.go b/mutex.go index f64c7a4..09abe7b 100644 --- a/mutex.go +++ b/mutex.go @@ -16,11 +16,12 @@ import ( func New(id, topic, key string, options ...MutexOption) *Mutex { m := &Mutex{ - id: id, - topic: strings.ToLower(topic), - key: strings.ToLower(key), - timeout: DefaultTimeout, - ttl: DefaultLeaseTerm, + id: id, + topic: strings.ToLower(topic), + key: strings.ToLower(key), + timeout: DefaultTimeout, + ttl: DefaultLeaseTerm, + keepalive: DefaultKeepAlive, } for _, o := range options { @@ -36,12 +37,13 @@ func New(id, topic, key string, options ...MutexOption) *Mutex { type Mutex struct { mu sync.RWMutex - id string - topic string - key string - peers []string - timeout time.Duration - ttl time.Duration + id string + topic string + key string + peers []string + timeout time.Duration + ttl time.Duration + keepalive time.Duration consensus int @@ -105,7 +107,7 @@ func (m *Mutex) Lock(ctx context.Context) error { return m.Error(errs, err) } - t := result[0] + t := result[len(result)-1] if t.IsExpired(start) { return ErrExpiredLease @@ -153,7 +155,7 @@ func (m *Mutex) Renew(ctx context.Context) error { return m.Error(errs, err) } - t := result[0] + t := result[len(result)-1] if t.IsExpired(start) { return ErrExpiredLease } @@ -280,18 +282,24 @@ func (m *Mutex) createRequest() LockRequest { } func (m *Mutex) waitExpires() { + delay := time.NewTimer(1 * time.Second) + defer delay.Stop() var expiresOn time.Time + for { m.mu.RLock() expiresOn = m.lease.ExpiresOn m.mu.RUnlock() + delay.Stop() + delay.Reset(time.Until(expiresOn)) + select { case <-m.Context.Done(): return - case <-time.After(time.Until(expiresOn)): + case <-delay.C: // get latest ExpiresOn m.mu.RLock() expiresOn = m.lease.ExpiresOn @@ -306,13 +314,15 @@ func (m *Mutex) waitExpires() { } func (m *Mutex) Keepalive() { + delay := time.NewTicker(m.keepalive) + defer delay.Stop() var err error for { select { case <-m.Context.Done(): return - case <-time.After(1 * time.Second): + case <-delay.C: m.mu.RLock() expiresOn := m.lease.ExpiresOn m.mu.RUnlock() diff --git a/mutex_option.go b/mutex_option.go index 025ea5c..0b93cb7 100644 --- a/mutex_option.go +++ b/mutex_option.go @@ -23,3 +23,9 @@ func WithTimeout(d time.Duration) MutexOption { m.timeout = d } } + +func WithKeepAlive(d time.Duration) MutexOption { + return func(m *Mutex) { + m.keepalive = d + } +} diff --git a/mutext_test.go b/mutext_test.go index ebad4dd..48f8551 100644 --- a/mutext_test.go +++ b/mutext_test.go @@ -223,7 +223,7 @@ func TestRenew(t *testing.T) { name: "keepalive_should_work", run: func(r *require.Assertions) { ttl := 2 * time.Second - m := New("renew", "wallet", "renew_keepalive", WithPeers(peers...), WithTTL(ttl)) + m := New("renew", "wallet", "renew_keepalive", WithPeers(peers...), WithTTL(ttl), WithKeepAlive(1*time.Second)) err := m.Lock(context.TODO()) r.NoError(err) From c18f4b65f968ecc1a2225f6be2355b679b8bf4c1 Mon Sep 17 00:00:00 2001 From: Lz Date: Sat, 20 Apr 2024 20:16:43 +0800 Subject: [PATCH 7/9] chore(gomod): upgraded gomods --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 1e4a60e..7093e69 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/mattn/go-sqlite3 v1.14.22 github.com/stretchr/testify v1.9.0 github.com/yaitoo/async v1.0.4 - github.com/yaitoo/sqle v1.4.5-0.20240420095412-e5fa84d2bf7d + github.com/yaitoo/sqle v1.4.5-0.20240420121556-c4eb23e47566 ) require ( diff --git a/go.sum b/go.sum index 9f65df0..eac2888 100644 --- a/go.sum +++ b/go.sum @@ -15,8 +15,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/yaitoo/async v1.0.4 h1:u+SWuJcSckgBOcMjMYz9IviojeCatDrdni3YNGLCiHY= github.com/yaitoo/async v1.0.4/go.mod h1:IpSO7Ei7AxiqLxFqDjN4rJaVlt8wm4ZxMXyyQaWmM1g= -github.com/yaitoo/sqle v1.4.5-0.20240420095412-e5fa84d2bf7d h1:vx5TmDJim+D4janxO80zQSuG2aH5aOUuLx0OD5Qztnw= -github.com/yaitoo/sqle v1.4.5-0.20240420095412-e5fa84d2bf7d/go.mod h1:Bv1PPG6hYZP2In3WKN1dBqYNJiWP0ZSLs6uEkRo2c9M= +github.com/yaitoo/sqle v1.4.5-0.20240420121556-c4eb23e47566 h1:oERhBE57LOG3zUiCCN/M8fWdFNTmzoFkwNMjDkPNW/4= +github.com/yaitoo/sqle v1.4.5-0.20240420121556-c4eb23e47566/go.mod h1:Bv1PPG6hYZP2In3WKN1dBqYNJiWP0ZSLs6uEkRo2c9M= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= From b977aa1c1e75d28b209ba95ca4c6daa68ceacfd4 Mon Sep 17 00:00:00 2001 From: Lz Date: Sat, 20 Apr 2024 20:49:21 +0800 Subject: [PATCH 8/9] chore(gomod): upgraded gomods --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 7093e69..9370cde 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/mattn/go-sqlite3 v1.14.22 github.com/stretchr/testify v1.9.0 github.com/yaitoo/async v1.0.4 - github.com/yaitoo/sqle v1.4.5-0.20240420121556-c4eb23e47566 + github.com/yaitoo/sqle v1.4.5 ) require ( diff --git a/go.sum b/go.sum index eac2888..551d9b2 100644 --- a/go.sum +++ b/go.sum @@ -15,8 +15,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/yaitoo/async v1.0.4 h1:u+SWuJcSckgBOcMjMYz9IviojeCatDrdni3YNGLCiHY= github.com/yaitoo/async v1.0.4/go.mod h1:IpSO7Ei7AxiqLxFqDjN4rJaVlt8wm4ZxMXyyQaWmM1g= -github.com/yaitoo/sqle v1.4.5-0.20240420121556-c4eb23e47566 h1:oERhBE57LOG3zUiCCN/M8fWdFNTmzoFkwNMjDkPNW/4= -github.com/yaitoo/sqle v1.4.5-0.20240420121556-c4eb23e47566/go.mod h1:Bv1PPG6hYZP2In3WKN1dBqYNJiWP0ZSLs6uEkRo2c9M= +github.com/yaitoo/sqle v1.4.5 h1:2xWCNfGgCNisMsQgpCvMFuhaTkTFIvF+nkjEu6H41lU= +github.com/yaitoo/sqle v1.4.5/go.mod h1:Bv1PPG6hYZP2In3WKN1dBqYNJiWP0ZSLs6uEkRo2c9M= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= From ec222e9965df8a76c36a8d244258ccc650a51a99 Mon Sep 17 00:00:00 2001 From: Lz Date: Sat, 20 Apr 2024 20:56:05 +0800 Subject: [PATCH 9/9] fix(tests): fixed keepalive test --- mutext_test.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/mutext_test.go b/mutext_test.go index 48f8551..6134378 100644 --- a/mutext_test.go +++ b/mutext_test.go @@ -222,8 +222,11 @@ func TestRenew(t *testing.T) { { name: "keepalive_should_work", run: func(r *require.Assertions) { - ttl := 2 * time.Second - m := New("renew", "wallet", "renew_keepalive", WithPeers(peers...), WithTTL(ttl), WithKeepAlive(1*time.Second)) + ttl := 3 * time.Second + m := New("renew", "wallet", "renew_keepalive", + WithPeers(peers...), + WithTTL(ttl), + WithKeepAlive(1*time.Second)) err := m.Lock(context.TODO()) r.NoError(err) @@ -234,7 +237,7 @@ func TestRenew(t *testing.T) { go m.Keepalive() - time.Sleep(ttl) + time.Sleep(ttl + 1*time.Second) err = m.Renew(context.TODO()) r.NoError(err)