Skip to content

Commit

Permalink
feat(simplefs): handle max size
Browse files Browse the repository at this point in the history
  • Loading branch information
darkweak committed Dec 2, 2024
1 parent 88bb38d commit 638cce8
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 14 deletions.
1 change: 1 addition & 0 deletions core/registered.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
var registered = sync.Map{}

func RegisterStorage(s Storer) {
s.Init()

Check failure on line 11 in core/registered.go

View workflow job for this annotation

GitHub Actions / Validate quality (core)

Error return value of `s.Init` is not checked (errcheck)
registered.Store(fmt.Sprintf("%s-%s", s.Name(), s.Uuid()), s)
}

Expand Down
95 changes: 81 additions & 14 deletions simplefs/simplefs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"path/filepath"
"regexp"
"strings"
"sync"
"time"

"github.com/darkweak/storages/core"
Expand All @@ -20,11 +21,14 @@ import (

// Simplefs provider type.
type Simplefs struct {
cache *ttlcache.Cache[string, []byte]
stale time.Duration
size int
path string
logger core.Logger
cache *ttlcache.Cache[string, []byte]
stale time.Duration
size int
path string
logger core.Logger
actualSize int64
directorySize int64
mu sync.Mutex
}

func onEvict(path string) error {
Expand All @@ -33,8 +37,11 @@ func onEvict(path string) error {

// Factory function create new Simplefs instance.
func Factory(simplefsCfg core.CacheProvider, logger core.Logger, stale time.Duration) (core.Storer, error) {
var directorySize int64

storagePath := simplefsCfg.Path
size := 0
directorySize = -1

simplefsConfiguration := simplefsCfg.Configuration
if simplefsConfiguration != nil {
Expand All @@ -52,6 +59,14 @@ func Factory(simplefsCfg core.CacheProvider, logger core.Logger, stale time.Dura
storagePath = val
}
}

if v, found := sfsconfig["directory_size"]; found && v != nil {
if val, ok := v.(int64); ok && val > 0 {
directorySize = val
} else if val, ok := v.(float64); ok && val > 0 {
directorySize = int64(val)
}
}
}
}

Expand All @@ -73,12 +88,6 @@ func Factory(simplefsCfg core.CacheProvider, logger core.Logger, stale time.Dura
ttlcache.WithCapacity[string, []byte](uint64(size)),
)

cache.OnEviction(func(_ context.Context, _ ttlcache.EvictionReason, i *ttlcache.Item[string, []byte]) {
if err := onEvict(string(i.Value())); err != nil {
logger.Errorf("impossible to remove the file %s: %#v", i.Key(), err)
}
})

if cache == nil {
err = errors.New("Impossible to initialize the simplefs storage.")
logger.Error(err)
Expand All @@ -96,7 +105,7 @@ func Factory(simplefsCfg core.CacheProvider, logger core.Logger, stale time.Dura

go cache.Start()

return &Simplefs{cache: cache, logger: logger, path: storagePath, size: size, stale: stale}, nil
return &Simplefs{cache: cache, directorySize: directorySize, logger: logger, mu: sync.Mutex{}, path: storagePath, size: size, stale: stale}, nil
}

// Name returns the storer name.
Expand Down Expand Up @@ -134,7 +143,7 @@ func (provider *Simplefs) ListKeys() []string {
func (provider *Simplefs) Get(key string) []byte {
result := provider.cache.Get(key)
if result == nil {
provider.logger.Errorf("Impossible to get the key %s in Simplefs", key)
provider.logger.Warnf("Impossible to get the key %s in Simplefs", key)

return nil
}
Expand Down Expand Up @@ -163,6 +172,22 @@ func (provider *Simplefs) GetMultiLevel(key string, req *http.Request, validator
return fresh, stale
}

func (provider *Simplefs) recoverEnoughSpaceIfNeeded(size int64) {
if provider.directorySize > -1 && provider.actualSize+size > provider.directorySize {
provider.cache.RangeBackwards(func(item *ttlcache.Item[string, []byte]) bool {
// Remove the oldest item if there is not enough space.
// TODO: open a PR to expose a range that iterate on LRU items.

Check failure on line 179 in simplefs/simplefs.go

View workflow job for this annotation

GitHub Actions / Validate quality (simplefs)

simplefs.go:179: Line contains TODO/BUG/FIXME: "TODO: open a PR to expose a range that i..." (godox)
provider.cache.Delete(string(item.Value()))

return false
})

provider.recoverEnoughSpaceIfNeeded(size)
}

return

Check failure on line 188 in simplefs/simplefs.go

View workflow job for this annotation

GitHub Actions / Validate quality (simplefs)

S1023: redundant `return` statement (gosimple)
}

// SetMultiLevel tries to store the key with the given value and update the mapping key to store metadata.
func (provider *Simplefs) SetMultiLevel(baseKey, variedKey string, value []byte, variedHeaders http.Header, etag string, duration time.Duration, realKey string) error {
now := time.Now()
Expand All @@ -174,6 +199,8 @@ func (provider *Simplefs) SetMultiLevel(baseKey, variedKey string, value []byte,
return err
}

provider.recoverEnoughSpaceIfNeeded(int64(compressed.Len()))

joinedFP := filepath.Join(provider.path, url.PathEscape(variedKey))
//nolint:gosec
if err := os.WriteFile(joinedFP, compressed.Bytes(), 0o644); err != nil {
Expand All @@ -188,7 +215,7 @@ func (provider *Simplefs) SetMultiLevel(baseKey, variedKey string, value []byte,
item := provider.cache.Get(mappingKey)

if item == nil {
provider.logger.Errorf("Impossible to get the mapping key %s in Simplefs", mappingKey)
provider.logger.Warnf("Impossible to get the mapping key %s in Simplefs", mappingKey)

item = &ttlcache.Item[string, []byte]{}
}
Expand Down Expand Up @@ -240,6 +267,46 @@ func (provider *Simplefs) DeleteMany(key string) {

// Init method will.
func (provider *Simplefs) Init() error {
provider.cache.OnInsertion(func(_ context.Context, i *ttlcache.Item[string, []byte]) {

Check failure on line 270 in simplefs/simplefs.go

View workflow job for this annotation

GitHub Actions / Validate quality (simplefs)

parameter name 'i' is too short for the scope of its usage (varnamelen)
if strings.Contains(string(i.Key()), core.MappingKeyPrefix) {

Check failure on line 271 in simplefs/simplefs.go

View workflow job for this annotation

GitHub Actions / Validate quality (simplefs)

unnecessary conversion (unconvert)
return
}

info, err := os.Stat(string(i.Value()))
if err != nil {
provider.logger.Errorf("impossible to get the file size %s: %#v", i.Key(), err)

return
}

provider.mu.Lock()
provider.actualSize += info.Size()
provider.logger.Debugf("Actual size add: %d", provider.actualSize, info.Size())
provider.mu.Unlock()
})

provider.cache.OnEviction(func(_ context.Context, _ ttlcache.EvictionReason, i *ttlcache.Item[string, []byte]) {

Check failure on line 288 in simplefs/simplefs.go

View workflow job for this annotation

GitHub Actions / Validate quality (simplefs)

parameter name 'i' is too short for the scope of its usage (varnamelen)
if strings.Contains(string(i.Value()), core.MappingKeyPrefix) {
return
}

info, err := os.Stat(string(i.Value()))
if err != nil {
provider.logger.Errorf("impossible to get the file size %s: %#v", i.Key(), err)

return
}

provider.mu.Lock()
provider.actualSize -= info.Size()
provider.logger.Debugf("Actual size remove: %d", provider.actualSize, info.Size())
provider.mu.Unlock()

if err := onEvict(string(i.Value())); err != nil {
provider.logger.Errorf("impossible to remove the file %s: %#v", i.Key(), err)
}
})

return nil
}

Expand Down
24 changes: 24 additions & 0 deletions simplefs/simplefs_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package simplefs_test

import (
"fmt"
"net/http"
"testing"
"time"

Expand Down Expand Up @@ -133,3 +135,25 @@ func TestSimplefs_Init(t *testing.T) {
t.Error("Impossible to init Simplefs provider")
}
}

func TestSimplefs_EvictAfterXSeconds(t *testing.T) {
client, _ := getSimplefsInstance()
client.Init()

Check failure on line 141 in simplefs/simplefs_test.go

View workflow job for this annotation

GitHub Actions / Validate quality (simplefs)

Error return value of `client.Init` is not checked (errcheck)

for i := 0; i < 10; i++ {

Check failure on line 143 in simplefs/simplefs_test.go

View workflow job for this annotation

GitHub Actions / Validate quality (simplefs)

for loop can be changed to use an integer range (Go 1.22+) (intrange)
key := fmt.Sprintf("Test_%d", i)
_ = client.SetMultiLevel(key, key, []byte(baseValue), http.Header{}, "", 1*time.Second, key)
}

res := client.Get("Test_0")
if len(res) != 0 {
t.Errorf("Key %s should be evicted", "Test_0")
}

res = client.Get("Test_9")
if len(res) == 0 {
t.Errorf("Key %s should exist", "Test_9")
}

time.Sleep(3 * time.Second)
}

0 comments on commit 638cce8

Please sign in to comment.