From 37dc00f3c3feaf513a47c8523cfeaae3c64e34ec Mon Sep 17 00:00:00 2001 From: Hasit Mistry Date: Thu, 7 Jul 2022 11:53:19 -0700 Subject: [PATCH 1/4] Honor current TTL in DMap.Incr and DMap.Decr methods Since Incr and Decr operations internally do a Get followed by Put, the TTL value continuously . This change brings the following 2 changes to honor the TTL: 1. Update DMap.loadCurrentAtomicInt to return current TTL value from entry 2. Update DMAP.atomicIncrDecr to received the current TTL after calling DMap.loadCurrentAtomicInt and then update e.putConfig before putting the entry. This update signifies that the put contains an expiry and passes the new expiry value. --- internal/dmap/atomic.go | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/internal/dmap/atomic.go b/internal/dmap/atomic.go index 34d66bfa..4c725211 100644 --- a/internal/dmap/atomic.go +++ b/internal/dmap/atomic.go @@ -18,29 +18,31 @@ import ( "context" "errors" "fmt" + "time" + "github.com/buraksezer/olric/internal/protocol" "github.com/buraksezer/olric/internal/resp" "github.com/buraksezer/olric/internal/util" "github.com/buraksezer/olric/pkg/storage" ) -func (dm *DMap) loadCurrentAtomicInt(e *env) (int, error) { +func (dm *DMap) loadCurrentAtomicInt(e *env) (int, int64, error) { entry, err := dm.Get(e.ctx, e.key) if errors.Is(err, ErrKeyNotFound) { - return 0, nil + return 0, 0, nil } if err != nil { - return 0, err + return 0, 0, err } if entry == nil { - return 0, nil + return 0, 0, nil } nr, err := util.ParseInt(entry.Value(), 10, 64) if err != nil { - return 0, nil + return 0, 0, nil } - return int(nr), nil + return int(nr), entry.TTL(), nil } func (dm *DMap) atomicIncrDecr(cmd string, e *env, delta int) (int, error) { @@ -53,7 +55,7 @@ func (dm *DMap) atomicIncrDecr(cmd string, e *env, delta int) (int, error) { } }() - current, err := dm.loadCurrentAtomicInt(e) + current, ttl, err := dm.loadCurrentAtomicInt(e) if err != nil { return 0, err } @@ -79,6 +81,10 @@ func (dm *DMap) atomicIncrDecr(cmd string, e *env, delta int) (int, error) { pool.Put(valueBuf) }() + if ttl != 0 { + e.putConfig.HasEX = true + e.putConfig.EX = time.Until(time.UnixMilli(ttl)) + } err = dm.put(e) if err != nil { return 0, err From faae5b5006a3045e247f6d929d33a12ef71686e9 Mon Sep 17 00:00:00 2001 From: Hasit Mistry Date: Tue, 12 Jul 2022 23:34:34 -0700 Subject: [PATCH 2/4] Use PX instead of EX in atomicIncrDecr --- internal/dmap/atomic.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/dmap/atomic.go b/internal/dmap/atomic.go index 4c725211..99171357 100644 --- a/internal/dmap/atomic.go +++ b/internal/dmap/atomic.go @@ -82,8 +82,8 @@ func (dm *DMap) atomicIncrDecr(cmd string, e *env, delta int) (int, error) { }() if ttl != 0 { - e.putConfig.HasEX = true - e.putConfig.EX = time.Until(time.UnixMilli(ttl)) + e.putConfig.HasPX = true + e.putConfig.PX = time.Until(time.UnixMilli(ttl)) } err = dm.put(e) if err != nil { From 4e3c2a7ae0919839b18656412ce2a3c8d69e504a Mon Sep 17 00:00:00 2001 From: Hasit Mistry Date: Wed, 3 Aug 2022 14:46:43 -0700 Subject: [PATCH 3/4] Add test for dmap.loadCurrentAtomicInt method - Pass `context.Background()` to newEnv in `readRepair` intead of `nil` for the sake of being explicit - Update some test variable creations to use `:=` instead of `var` --- integration_test.go | 12 ++++++------ internal/dmap/atomic_test.go | 33 +++++++++++++++++++++++++++++++++ internal/dmap/get.go | 2 +- 3 files changed, 40 insertions(+), 7 deletions(-) diff --git a/integration_test.go b/integration_test.go index e29dfc19..5603b7a2 100644 --- a/integration_test.go +++ b/integration_test.go @@ -93,7 +93,7 @@ func TestIntegration_NodesJoinOrLeftDuringQuery(t *testing.T) { } func TestIntegration_DMap_Cache_Eviction_LRU_MaxKeys(t *testing.T) { - var maxKeys = 100000 + maxKeys := 100000 newConfig := func() *config.Config { c := config.New("local") c.PartitionCount = config.DefaultPartitionCount @@ -142,7 +142,7 @@ func TestIntegration_DMap_Cache_Eviction_LRU_MaxKeys(t *testing.T) { } func TestIntegration_DMap_Cache_Eviction_MaxKeys(t *testing.T) { - var maxKeys = 100000 + maxKeys := 100000 newConfig := func() *config.Config { c := config.New("local") c.PartitionCount = config.DefaultPartitionCount @@ -198,7 +198,7 @@ func TestIntegration_DMap_Cache_Eviction_MaxKeys(t *testing.T) { } func TestIntegration_DMap_Cache_Eviction_MaxIdleDuration(t *testing.T) { - var maxKeys = 100000 + maxKeys := 100000 newConfig := func() *config.Config { c := config.New("local") c.PartitionCount = config.DefaultPartitionCount @@ -247,7 +247,7 @@ func TestIntegration_DMap_Cache_Eviction_MaxIdleDuration(t *testing.T) { } func TestIntegration_DMap_Cache_Eviction_TTLDuration(t *testing.T) { - var maxKeys = 100000 + maxKeys := 100000 newConfig := func() *config.Config { c := config.New("local") c.PartitionCount = config.DefaultPartitionCount @@ -285,7 +285,7 @@ func TestIntegration_DMap_Cache_Eviction_TTLDuration(t *testing.T) { var total int for i := 0; i < maxKeys; i++ { - _, err = dm.Get(ctx, fmt.Sprintf("mykey-%d", i)) + _, err := dm.Get(ctx, fmt.Sprintf("mykey-%d", i)) if err == ErrKeyNotFound { err = nil total++ @@ -296,7 +296,7 @@ func TestIntegration_DMap_Cache_Eviction_TTLDuration(t *testing.T) { } func TestIntegration_DMap_Cache_Eviction_LRU_MaxInuse(t *testing.T) { - var maxKeys = 100000 + maxKeys := 100000 newConfig := func() *config.Config { c := config.New("local") c.PartitionCount = config.DefaultPartitionCount diff --git a/internal/dmap/atomic_test.go b/internal/dmap/atomic_test.go index b2e3ba3b..cf911947 100644 --- a/internal/dmap/atomic_test.go +++ b/internal/dmap/atomic_test.go @@ -20,6 +20,7 @@ import ( "sync" "sync/atomic" "testing" + "time" "github.com/buraksezer/olric/internal/protocol" "github.com/buraksezer/olric/internal/resp" @@ -29,6 +30,38 @@ import ( "golang.org/x/sync/errgroup" ) +func TestDMap_loadCurrentAtomicInt(t *testing.T) { + cluster := testcluster.New(NewService) + s := cluster.AddMember(nil).(*Service) + defer cluster.Shutdown() + + ctx := context.Background() + key := "incr" + + ttlDuration := time.Second * 5 + s.config.DMaps.TTLDuration = time.Second * 5 + + dm, err := s.NewDMap("atomic_test") + require.NoError(t, err) + + _, err = dm.Incr(ctx, key, 1) + if err != nil { + s.log.V(2).Printf("[ERROR] Failed to call Incr: %v", err) + return + } + + e := newEnv(ctx) + e.dmap = dm.name + e.key = key + _, ttl, err := dm.loadCurrentAtomicInt(e) + require.NoError(t, err) + + timePassed := time.Millisecond * 500 + <-time.After(timePassed) + now := time.Now() + require.WithinDuration(t, time.UnixMilli(ttl), now, ttlDuration) +} + func TestDMap_Atomic_Incr(t *testing.T) { cluster := testcluster.New(NewService) s := cluster.AddMember(nil).(*Service) diff --git a/internal/dmap/get.go b/internal/dmap/get.go index 52423ea1..9aa769e1 100644 --- a/internal/dmap/get.go +++ b/internal/dmap/get.go @@ -249,7 +249,7 @@ func (dm *DMap) readRepair(winner *version, versions []*version) { } f.Lock() - e := newEnv(nil) + e := newEnv(context.Background()) e.hkey = hkey e.fragment = f err = dm.putEntryOnFragment(e, winner.entry) From 137cf15c004665daf725f57599aa312784638fd1 Mon Sep 17 00:00:00 2001 From: Hasit Mistry Date: Wed, 3 Aug 2022 14:59:43 -0700 Subject: [PATCH 4/4] Remove unnecessary variables from TestDMap_loadCurrentAtomicInt test --- internal/dmap/atomic_test.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/internal/dmap/atomic_test.go b/internal/dmap/atomic_test.go index cf911947..b62d75a5 100644 --- a/internal/dmap/atomic_test.go +++ b/internal/dmap/atomic_test.go @@ -56,10 +56,8 @@ func TestDMap_loadCurrentAtomicInt(t *testing.T) { _, ttl, err := dm.loadCurrentAtomicInt(e) require.NoError(t, err) - timePassed := time.Millisecond * 500 - <-time.After(timePassed) - now := time.Now() - require.WithinDuration(t, time.UnixMilli(ttl), now, ttlDuration) + <-time.After(time.Millisecond * 500) + require.WithinDuration(t, time.UnixMilli(ttl), time.Now(), ttlDuration) } func TestDMap_Atomic_Incr(t *testing.T) {