From 638cce88511032aca8165735105b9905bd01c29b Mon Sep 17 00:00:00 2001 From: darkweak Date: Mon, 2 Dec 2024 19:43:53 +0100 Subject: [PATCH 1/3] feat(simplefs): handle max size --- core/registered.go | 1 + simplefs/simplefs.go | 95 +++++++++++++++++++++++++++++++++------ simplefs/simplefs_test.go | 24 ++++++++++ 3 files changed, 106 insertions(+), 14 deletions(-) diff --git a/core/registered.go b/core/registered.go index 136a58e..de89a09 100644 --- a/core/registered.go +++ b/core/registered.go @@ -8,6 +8,7 @@ import ( var registered = sync.Map{} func RegisterStorage(s Storer) { + s.Init() registered.Store(fmt.Sprintf("%s-%s", s.Name(), s.Uuid()), s) } diff --git a/simplefs/simplefs.go b/simplefs/simplefs.go index 345f4ad..e479ed0 100644 --- a/simplefs/simplefs.go +++ b/simplefs/simplefs.go @@ -11,6 +11,7 @@ import ( "path/filepath" "regexp" "strings" + "sync" "time" "github.com/darkweak/storages/core" @@ -20,11 +21,14 @@ import ( // Simplefs provider type. type Simplefs struct { - cache *ttlcache.Cache[string, []byte] - stale time.Duration - size int - path string - logger core.Logger + cache *ttlcache.Cache[string, []byte] + stale time.Duration + size int + path string + logger core.Logger + actualSize int64 + directorySize int64 + mu sync.Mutex } func onEvict(path string) error { @@ -33,8 +37,11 @@ func onEvict(path string) error { // Factory function create new Simplefs instance. func Factory(simplefsCfg core.CacheProvider, logger core.Logger, stale time.Duration) (core.Storer, error) { + var directorySize int64 + storagePath := simplefsCfg.Path size := 0 + directorySize = -1 simplefsConfiguration := simplefsCfg.Configuration if simplefsConfiguration != nil { @@ -52,6 +59,14 @@ func Factory(simplefsCfg core.CacheProvider, logger core.Logger, stale time.Dura storagePath = val } } + + if v, found := sfsconfig["directory_size"]; found && v != nil { + if val, ok := v.(int64); ok && val > 0 { + directorySize = val + } else if val, ok := v.(float64); ok && val > 0 { + directorySize = int64(val) + } + } } } @@ -73,12 +88,6 @@ func Factory(simplefsCfg core.CacheProvider, logger core.Logger, stale time.Dura ttlcache.WithCapacity[string, []byte](uint64(size)), ) - cache.OnEviction(func(_ context.Context, _ ttlcache.EvictionReason, i *ttlcache.Item[string, []byte]) { - if err := onEvict(string(i.Value())); err != nil { - logger.Errorf("impossible to remove the file %s: %#v", i.Key(), err) - } - }) - if cache == nil { err = errors.New("Impossible to initialize the simplefs storage.") logger.Error(err) @@ -96,7 +105,7 @@ func Factory(simplefsCfg core.CacheProvider, logger core.Logger, stale time.Dura go cache.Start() - return &Simplefs{cache: cache, logger: logger, path: storagePath, size: size, stale: stale}, nil + return &Simplefs{cache: cache, directorySize: directorySize, logger: logger, mu: sync.Mutex{}, path: storagePath, size: size, stale: stale}, nil } // Name returns the storer name. @@ -134,7 +143,7 @@ func (provider *Simplefs) ListKeys() []string { func (provider *Simplefs) Get(key string) []byte { result := provider.cache.Get(key) if result == nil { - provider.logger.Errorf("Impossible to get the key %s in Simplefs", key) + provider.logger.Warnf("Impossible to get the key %s in Simplefs", key) return nil } @@ -163,6 +172,22 @@ func (provider *Simplefs) GetMultiLevel(key string, req *http.Request, validator return fresh, stale } +func (provider *Simplefs) recoverEnoughSpaceIfNeeded(size int64) { + if provider.directorySize > -1 && provider.actualSize+size > provider.directorySize { + provider.cache.RangeBackwards(func(item *ttlcache.Item[string, []byte]) bool { + // Remove the oldest item if there is not enough space. + // TODO: open a PR to expose a range that iterate on LRU items. + provider.cache.Delete(string(item.Value())) + + return false + }) + + provider.recoverEnoughSpaceIfNeeded(size) + } + + return +} + // SetMultiLevel tries to store the key with the given value and update the mapping key to store metadata. func (provider *Simplefs) SetMultiLevel(baseKey, variedKey string, value []byte, variedHeaders http.Header, etag string, duration time.Duration, realKey string) error { now := time.Now() @@ -174,6 +199,8 @@ func (provider *Simplefs) SetMultiLevel(baseKey, variedKey string, value []byte, return err } + provider.recoverEnoughSpaceIfNeeded(int64(compressed.Len())) + joinedFP := filepath.Join(provider.path, url.PathEscape(variedKey)) //nolint:gosec if err := os.WriteFile(joinedFP, compressed.Bytes(), 0o644); err != nil { @@ -188,7 +215,7 @@ func (provider *Simplefs) SetMultiLevel(baseKey, variedKey string, value []byte, item := provider.cache.Get(mappingKey) if item == nil { - provider.logger.Errorf("Impossible to get the mapping key %s in Simplefs", mappingKey) + provider.logger.Warnf("Impossible to get the mapping key %s in Simplefs", mappingKey) item = &ttlcache.Item[string, []byte]{} } @@ -240,6 +267,46 @@ func (provider *Simplefs) DeleteMany(key string) { // Init method will. func (provider *Simplefs) Init() error { + provider.cache.OnInsertion(func(_ context.Context, i *ttlcache.Item[string, []byte]) { + if strings.Contains(string(i.Key()), core.MappingKeyPrefix) { + return + } + + info, err := os.Stat(string(i.Value())) + if err != nil { + provider.logger.Errorf("impossible to get the file size %s: %#v", i.Key(), err) + + return + } + + provider.mu.Lock() + provider.actualSize += info.Size() + provider.logger.Debugf("Actual size add: %d", provider.actualSize, info.Size()) + provider.mu.Unlock() + }) + + provider.cache.OnEviction(func(_ context.Context, _ ttlcache.EvictionReason, i *ttlcache.Item[string, []byte]) { + if strings.Contains(string(i.Value()), core.MappingKeyPrefix) { + return + } + + info, err := os.Stat(string(i.Value())) + if err != nil { + provider.logger.Errorf("impossible to get the file size %s: %#v", i.Key(), err) + + return + } + + provider.mu.Lock() + provider.actualSize -= info.Size() + provider.logger.Debugf("Actual size remove: %d", provider.actualSize, info.Size()) + provider.mu.Unlock() + + if err := onEvict(string(i.Value())); err != nil { + provider.logger.Errorf("impossible to remove the file %s: %#v", i.Key(), err) + } + }) + return nil } diff --git a/simplefs/simplefs_test.go b/simplefs/simplefs_test.go index c1bf38b..5ba5552 100644 --- a/simplefs/simplefs_test.go +++ b/simplefs/simplefs_test.go @@ -1,6 +1,8 @@ package simplefs_test import ( + "fmt" + "net/http" "testing" "time" @@ -133,3 +135,25 @@ func TestSimplefs_Init(t *testing.T) { t.Error("Impossible to init Simplefs provider") } } + +func TestSimplefs_EvictAfterXSeconds(t *testing.T) { + client, _ := getSimplefsInstance() + client.Init() + + for i := 0; i < 10; i++ { + key := fmt.Sprintf("Test_%d", i) + _ = client.SetMultiLevel(key, key, []byte(baseValue), http.Header{}, "", 1*time.Second, key) + } + + res := client.Get("Test_0") + if len(res) != 0 { + t.Errorf("Key %s should be evicted", "Test_0") + } + + res = client.Get("Test_9") + if len(res) == 0 { + t.Errorf("Key %s should exist", "Test_9") + } + + time.Sleep(3 * time.Second) +} From 75cc6df58649b5e877baba094bbb1072a221c667 Mon Sep 17 00:00:00 2001 From: darkweak Date: Sun, 8 Dec 2024 12:51:30 +0100 Subject: [PATCH 2/3] fix(ci): golangci --- core/registered.go | 2 +- simplefs/simplefs.go | 23 +++++++++++------------ simplefs/simplefs_test.go | 4 ++-- 3 files changed, 14 insertions(+), 15 deletions(-) diff --git a/core/registered.go b/core/registered.go index de89a09..e0678df 100644 --- a/core/registered.go +++ b/core/registered.go @@ -8,7 +8,7 @@ import ( var registered = sync.Map{} func RegisterStorage(s Storer) { - s.Init() + _ = s.Init() registered.Store(fmt.Sprintf("%s-%s", s.Name(), s.Uuid()), s) } diff --git a/simplefs/simplefs.go b/simplefs/simplefs.go index e479ed0..a9217c7 100644 --- a/simplefs/simplefs.go +++ b/simplefs/simplefs.go @@ -176,6 +176,7 @@ func (provider *Simplefs) recoverEnoughSpaceIfNeeded(size int64) { if provider.directorySize > -1 && provider.actualSize+size > provider.directorySize { provider.cache.RangeBackwards(func(item *ttlcache.Item[string, []byte]) bool { // Remove the oldest item if there is not enough space. + //nolint:godox // TODO: open a PR to expose a range that iterate on LRU items. provider.cache.Delete(string(item.Value())) @@ -184,8 +185,6 @@ func (provider *Simplefs) recoverEnoughSpaceIfNeeded(size int64) { provider.recoverEnoughSpaceIfNeeded(size) } - - return } // SetMultiLevel tries to store the key with the given value and update the mapping key to store metadata. @@ -267,14 +266,14 @@ func (provider *Simplefs) DeleteMany(key string) { // Init method will. func (provider *Simplefs) Init() error { - provider.cache.OnInsertion(func(_ context.Context, i *ttlcache.Item[string, []byte]) { - if strings.Contains(string(i.Key()), core.MappingKeyPrefix) { + provider.cache.OnInsertion(func(_ context.Context, item *ttlcache.Item[string, []byte]) { + if strings.Contains(item.Key(), core.MappingKeyPrefix) { return } - info, err := os.Stat(string(i.Value())) + info, err := os.Stat(string(item.Value())) if err != nil { - provider.logger.Errorf("impossible to get the file size %s: %#v", i.Key(), err) + provider.logger.Errorf("impossible to get the file size %s: %#v", item.Key(), err) return } @@ -285,14 +284,14 @@ func (provider *Simplefs) Init() error { provider.mu.Unlock() }) - provider.cache.OnEviction(func(_ context.Context, _ ttlcache.EvictionReason, i *ttlcache.Item[string, []byte]) { - if strings.Contains(string(i.Value()), core.MappingKeyPrefix) { + provider.cache.OnEviction(func(_ context.Context, _ ttlcache.EvictionReason, item *ttlcache.Item[string, []byte]) { + if strings.Contains(string(item.Value()), core.MappingKeyPrefix) { return } - info, err := os.Stat(string(i.Value())) + info, err := os.Stat(string(item.Value())) if err != nil { - provider.logger.Errorf("impossible to get the file size %s: %#v", i.Key(), err) + provider.logger.Errorf("impossible to get the file size %s: %#v", item.Key(), err) return } @@ -302,8 +301,8 @@ func (provider *Simplefs) Init() error { provider.logger.Debugf("Actual size remove: %d", provider.actualSize, info.Size()) provider.mu.Unlock() - if err := onEvict(string(i.Value())); err != nil { - provider.logger.Errorf("impossible to remove the file %s: %#v", i.Key(), err) + if err := onEvict(string(item.Value())); err != nil { + provider.logger.Errorf("impossible to remove the file %s: %#v", item.Key(), err) } }) diff --git a/simplefs/simplefs_test.go b/simplefs/simplefs_test.go index 5ba5552..40bcf09 100644 --- a/simplefs/simplefs_test.go +++ b/simplefs/simplefs_test.go @@ -138,9 +138,9 @@ func TestSimplefs_Init(t *testing.T) { func TestSimplefs_EvictAfterXSeconds(t *testing.T) { client, _ := getSimplefsInstance() - client.Init() + _ = client.Init() - for i := 0; i < 10; i++ { + for i := range 10 { key := fmt.Sprintf("Test_%d", i) _ = client.SetMultiLevel(key, key, []byte(baseValue), http.Header{}, "", 1*time.Second, key) } From 08ca1166d784ef00cc8909f65ce5b9feda12f0e2 Mon Sep 17 00:00:00 2001 From: darkweak Date: Sun, 8 Dec 2024 13:13:41 +0100 Subject: [PATCH 3/3] feat(simplefs): supports go-humanize to configure with xxxMB --- simplefs/simplefs.go | 16 ++++++++++++++++ simplefs/simplefs_test.go | 3 ++- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/simplefs/simplefs.go b/simplefs/simplefs.go index a9217c7..3f74b05 100644 --- a/simplefs/simplefs.go +++ b/simplefs/simplefs.go @@ -15,6 +15,7 @@ import ( "time" "github.com/darkweak/storages/core" + "github.com/dustin/go-humanize" "github.com/jellydator/ttlcache/v3" lz4 "github.com/pierrec/lz4/v4" ) @@ -65,6 +66,10 @@ func Factory(simplefsCfg core.CacheProvider, logger core.Logger, stale time.Dura directorySize = val } else if val, ok := v.(float64); ok && val > 0 { directorySize = int64(val) + } else if val, ok := v.(string); ok && val != "" { + s, _ := humanize.ParseBytes(val) + //nolint:gosec + directorySize = int64(s) } } } @@ -306,6 +311,17 @@ func (provider *Simplefs) Init() error { } }) + files, _ := os.ReadDir(provider.path) + provider.logger.Debugf("Regenerating simplefs cache from files in the given directory.") + + for _, f := range files { + if !f.IsDir() { + info, _ := f.Info() + provider.actualSize += info.Size() + provider.logger.Debugf("Add %v bytes to the actual size, sum to %v bytes.", info.Size(), provider.actualSize) + } + } + return nil } diff --git a/simplefs/simplefs_test.go b/simplefs/simplefs_test.go index 40bcf09..61d9c97 100644 --- a/simplefs/simplefs_test.go +++ b/simplefs/simplefs_test.go @@ -142,7 +142,8 @@ func TestSimplefs_EvictAfterXSeconds(t *testing.T) { for i := range 10 { key := fmt.Sprintf("Test_%d", i) - _ = client.SetMultiLevel(key, key, []byte(baseValue), http.Header{}, "", 1*time.Second, key) + _ = client.SetMultiLevel(key, key, []byte(baseValue), http.Header{}, "", time.Second, key) + time.Sleep(100 * time.Millisecond) } res := client.Get("Test_0")