From d983fa98d4da0df0690395097c58379d3ed86a18 Mon Sep 17 00:00:00 2001 From: Vicent Marti Date: Thu, 18 Mar 2021 18:14:13 +0100 Subject: [PATCH 1/2] cache: fix race when clearning a cache When clearing a cache while other goroutines are actively `Wait`ing for it, the waiting goroutines can become blocked indefinitely if one of the cache items is flushed by the `Clear` code instead of by the background worker, because the `Clear` code is not marking waitgroups as done. --- cache.go | 4 ++++ cache_test.go | 30 ++++++++++++++++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/cache.go b/cache.go index b37c9c96..7a19abad 100644 --- a/cache.go +++ b/cache.go @@ -348,6 +348,10 @@ loop: for { select { case i := <-c.setBuf: + if i.wg != nil { + i.wg.Done() + continue + } if i.flag != itemUpdate { // In itemUpdate, the value is already set in the store. So, no need to call // onEvict here. diff --git a/cache_test.go b/cache_test.go index 7d62386b..2fa8bc8f 100644 --- a/cache_test.go +++ b/cache_test.go @@ -715,6 +715,36 @@ func init() { bucketDurationSecs = 1 } +func TestBlockOnShutdown(t *testing.T) { + c, err := NewCache(&Config{ + NumCounters: 100, + MaxCost: 10, + BufferItems: 64, + Metrics: false, + }) + require.NoError(t, err) + + done := make(chan struct{}) + + go func() { + for i := 0; i < 10; i++ { + c.Wait() + } + close(done) + }() + + for i := 0; i < 10; i++ { + c.Clear() + } + + select { + case <-done: + // We're OK + case <-time.After(1 * time.Second): + t.Fatalf("timed out while waiting on cache") + } +} + // Regression test for bug https://github.com/dgraph-io/ristretto/issues/167 func TestDropUpdates(t *testing.T) { originalSetBugSize := setBufSize From 13ca558989af307e38d258db749687b0b82bad2c Mon Sep 17 00:00:00 2001 From: Vicent Marti Date: Thu, 18 Mar 2021 18:21:35 +0100 Subject: [PATCH 2/2] cache: rename test --- cache_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cache_test.go b/cache_test.go index 2fa8bc8f..8b185f62 100644 --- a/cache_test.go +++ b/cache_test.go @@ -715,7 +715,7 @@ func init() { bucketDurationSecs = 1 } -func TestBlockOnShutdown(t *testing.T) { +func TestBlockOnClear(t *testing.T) { c, err := NewCache(&Config{ NumCounters: 100, MaxCost: 10, @@ -723,6 +723,7 @@ func TestBlockOnShutdown(t *testing.T) { Metrics: false, }) require.NoError(t, err) + defer c.Close() done := make(chan struct{})