Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(simplefs): handle max size #20

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/registered.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
var registered = sync.Map{}

func RegisterStorage(s Storer) {
_ = s.Init()
registered.Store(fmt.Sprintf("%s-%s", s.Name(), s.Uuid()), s)
}

Expand Down
110 changes: 96 additions & 14 deletions simplefs/simplefs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,25 @@ import (
"path/filepath"
"regexp"
"strings"
"sync"
"time"

"github.com/darkweak/storages/core"
"github.com/dustin/go-humanize"
"github.com/jellydator/ttlcache/v3"
lz4 "github.com/pierrec/lz4/v4"
)

// Simplefs provider type.
type Simplefs struct {
cache *ttlcache.Cache[string, []byte]
stale time.Duration
size int
path string
logger core.Logger
cache *ttlcache.Cache[string, []byte]
stale time.Duration
size int
path string
logger core.Logger
actualSize int64
directorySize int64
mu sync.Mutex
}

func onEvict(path string) error {
Expand All @@ -33,8 +38,11 @@ func onEvict(path string) error {

// Factory function create new Simplefs instance.
func Factory(simplefsCfg core.CacheProvider, logger core.Logger, stale time.Duration) (core.Storer, error) {
var directorySize int64

storagePath := simplefsCfg.Path
size := 0
directorySize = -1

simplefsConfiguration := simplefsCfg.Configuration
if simplefsConfiguration != nil {
Expand All @@ -52,6 +60,18 @@ func Factory(simplefsCfg core.CacheProvider, logger core.Logger, stale time.Dura
storagePath = val
}
}

if v, found := sfsconfig["directory_size"]; found && v != nil {
if val, ok := v.(int64); ok && val > 0 {
directorySize = val
} else if val, ok := v.(float64); ok && val > 0 {
directorySize = int64(val)
} else if val, ok := v.(string); ok && val != "" {
s, _ := humanize.ParseBytes(val)
//nolint:gosec
directorySize = int64(s)
}
}
}
}

Expand All @@ -73,12 +93,6 @@ func Factory(simplefsCfg core.CacheProvider, logger core.Logger, stale time.Dura
ttlcache.WithCapacity[string, []byte](uint64(size)),
)

cache.OnEviction(func(_ context.Context, _ ttlcache.EvictionReason, i *ttlcache.Item[string, []byte]) {
if err := onEvict(string(i.Value())); err != nil {
logger.Errorf("impossible to remove the file %s: %#v", i.Key(), err)
}
})

if cache == nil {
err = errors.New("Impossible to initialize the simplefs storage.")
logger.Error(err)
Expand All @@ -96,7 +110,7 @@ func Factory(simplefsCfg core.CacheProvider, logger core.Logger, stale time.Dura

go cache.Start()

return &Simplefs{cache: cache, logger: logger, path: storagePath, size: size, stale: stale}, nil
return &Simplefs{cache: cache, directorySize: directorySize, logger: logger, mu: sync.Mutex{}, path: storagePath, size: size, stale: stale}, nil
}

// Name returns the storer name.
Expand Down Expand Up @@ -134,7 +148,7 @@ func (provider *Simplefs) ListKeys() []string {
func (provider *Simplefs) Get(key string) []byte {
result := provider.cache.Get(key)
if result == nil {
provider.logger.Errorf("Impossible to get the key %s in Simplefs", key)
provider.logger.Warnf("Impossible to get the key %s in Simplefs", key)

return nil
}
Expand Down Expand Up @@ -163,6 +177,21 @@ func (provider *Simplefs) GetMultiLevel(key string, req *http.Request, validator
return fresh, stale
}

func (provider *Simplefs) recoverEnoughSpaceIfNeeded(size int64) {
if provider.directorySize > -1 && provider.actualSize+size > provider.directorySize {
provider.cache.RangeBackwards(func(item *ttlcache.Item[string, []byte]) bool {
// Remove the oldest item if there is not enough space.
//nolint:godox
// TODO: open a PR to expose a range that iterate on LRU items.
provider.cache.Delete(string(item.Value()))

return false
})

provider.recoverEnoughSpaceIfNeeded(size)
}
}

// SetMultiLevel tries to store the key with the given value and update the mapping key to store metadata.
func (provider *Simplefs) SetMultiLevel(baseKey, variedKey string, value []byte, variedHeaders http.Header, etag string, duration time.Duration, realKey string) error {
now := time.Now()
Expand All @@ -174,6 +203,8 @@ func (provider *Simplefs) SetMultiLevel(baseKey, variedKey string, value []byte,
return err
}

provider.recoverEnoughSpaceIfNeeded(int64(compressed.Len()))

joinedFP := filepath.Join(provider.path, url.PathEscape(variedKey))
//nolint:gosec
if err := os.WriteFile(joinedFP, compressed.Bytes(), 0o644); err != nil {
Expand All @@ -188,7 +219,7 @@ func (provider *Simplefs) SetMultiLevel(baseKey, variedKey string, value []byte,
item := provider.cache.Get(mappingKey)

if item == nil {
provider.logger.Errorf("Impossible to get the mapping key %s in Simplefs", mappingKey)
provider.logger.Warnf("Impossible to get the mapping key %s in Simplefs", mappingKey)

item = &ttlcache.Item[string, []byte]{}
}
Expand Down Expand Up @@ -240,6 +271,57 @@ func (provider *Simplefs) DeleteMany(key string) {

// Init method will.
func (provider *Simplefs) Init() error {
provider.cache.OnInsertion(func(_ context.Context, item *ttlcache.Item[string, []byte]) {
if strings.Contains(item.Key(), core.MappingKeyPrefix) {
return
}

info, err := os.Stat(string(item.Value()))
if err != nil {
provider.logger.Errorf("impossible to get the file size %s: %#v", item.Key(), err)

return
}

provider.mu.Lock()
provider.actualSize += info.Size()
provider.logger.Debugf("Actual size add: %d", provider.actualSize, info.Size())
provider.mu.Unlock()
})

provider.cache.OnEviction(func(_ context.Context, _ ttlcache.EvictionReason, item *ttlcache.Item[string, []byte]) {
if strings.Contains(string(item.Value()), core.MappingKeyPrefix) {
return
}

info, err := os.Stat(string(item.Value()))
if err != nil {
provider.logger.Errorf("impossible to get the file size %s: %#v", item.Key(), err)

return
}

provider.mu.Lock()
provider.actualSize -= info.Size()
provider.logger.Debugf("Actual size remove: %d", provider.actualSize, info.Size())
provider.mu.Unlock()

if err := onEvict(string(item.Value())); err != nil {
provider.logger.Errorf("impossible to remove the file %s: %#v", item.Key(), err)
}
})

files, _ := os.ReadDir(provider.path)
provider.logger.Debugf("Regenerating simplefs cache from files in the given directory.")

for _, f := range files {
if !f.IsDir() {
info, _ := f.Info()
provider.actualSize += info.Size()
provider.logger.Debugf("Add %v bytes to the actual size, sum to %v bytes.", info.Size(), provider.actualSize)
}
}

return nil
}

Expand Down
25 changes: 25 additions & 0 deletions simplefs/simplefs_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package simplefs_test

import (
"fmt"
"net/http"
"testing"
"time"

Expand Down Expand Up @@ -133,3 +135,26 @@ func TestSimplefs_Init(t *testing.T) {
t.Error("Impossible to init Simplefs provider")
}
}

func TestSimplefs_EvictAfterXSeconds(t *testing.T) {
client, _ := getSimplefsInstance()
_ = client.Init()

for i := range 10 {
key := fmt.Sprintf("Test_%d", i)
_ = client.SetMultiLevel(key, key, []byte(baseValue), http.Header{}, "", time.Second, key)
time.Sleep(100 * time.Millisecond)
}

res := client.Get("Test_0")
if len(res) != 0 {
t.Errorf("Key %s should be evicted", "Test_0")
}

res = client.Get("Test_9")
if len(res) == 0 {
t.Errorf("Key %s should exist", "Test_9")
}

time.Sleep(3 * time.Second)
}
Loading