Skip to content

Commit

Permalink
trace jsoncs3 caches
Browse files Browse the repository at this point in the history
Signed-off-by: Jörn Friedrich Dreyer <[email protected]>
  • Loading branch information
butonic committed Jun 4, 2023
1 parent c3dd573 commit 9ecbfcb
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 15 deletions.
39 changes: 33 additions & 6 deletions pkg/share/manager/jsoncs3/providercache/providercache.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,13 @@ import (
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/storage/utils/metadata"
"github.com/cs3org/reva/v2/pkg/utils"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
)

// name is the Tracer name used to identify this instrumentation library.
const tracerName = "providercache"

// Cache holds share information structured by provider and space
type Cache struct {
Providers map[string]*Spaces
Expand Down Expand Up @@ -106,6 +111,10 @@ func New(s metadata.Storage, ttl time.Duration) Cache {

// Add adds a share to the cache
func (c *Cache) Add(ctx context.Context, storageID, spaceID, shareID string, share *collaboration.Share) error {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Add")
defer span.End()
span.SetAttributes(attribute.String("cs3.storageid", storageID), attribute.String("cs3.spaceid", spaceID), attribute.String("cs3.shareid", shareID))

switch {
case storageID == "":
return fmt.Errorf("missing storage id")
Expand All @@ -122,6 +131,10 @@ func (c *Cache) Add(ctx context.Context, storageID, spaceID, shareID string, sha

// Remove removes a share from the cache
func (c *Cache) Remove(ctx context.Context, storageID, spaceID, shareID string) error {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Remove")
defer span.End()
span.SetAttributes(attribute.String("cs3.storageid", storageID), attribute.String("cs3.spaceid", spaceID), attribute.String("cs3.shareid", shareID))

if c.Providers[storageID] == nil ||
c.Providers[storageID].Spaces[spaceID] == nil {
return nil
Expand Down Expand Up @@ -150,6 +163,10 @@ func (c *Cache) ListSpace(storageID, spaceID string) *Shares {

// PersistWithTime persists the data of one space if it has not been modified since the given mtime
func (c *Cache) PersistWithTime(ctx context.Context, storageID, spaceID string, mtime time.Time) error {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "PersistWithTime")
defer span.End()
span.SetAttributes(attribute.String("cs3.storageid", storageID), attribute.String("cs3.spaceid", spaceID))

if c.Providers[storageID] == nil || c.Providers[storageID].Spaces[spaceID] == nil {
return nil
}
Expand Down Expand Up @@ -187,15 +204,20 @@ func (c *Cache) Persist(ctx context.Context, storageID, spaceID string) error {

// Sync updates the in-memory data with the data from the storage if it is outdated
func (c *Cache) Sync(ctx context.Context, storageID, spaceID string) error {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Sync")
defer span.End()

span.SetAttributes(attribute.String("cs3.storageid", storageID), attribute.String("cs3.spaceid", spaceID))

log := appctx.GetLogger(ctx).With().Str("storageID", storageID).Str("spaceID", spaceID).Logger()
log.Debug().Msg("Syncing provider cache...")

var mtime time.Time
if c.Providers[storageID] != nil && c.Providers[storageID].Spaces[spaceID] != nil {
mtime = c.Providers[storageID].Spaces[spaceID].Mtime

if time.Now().Before(c.Providers[storageID].Spaces[spaceID].nextSync) {
log.Debug().Msg("Skipping provider cache sync, it was just recently synced...")
span.AddEvent("skip sync")
span.SetStatus(codes.Ok, "")
return nil
}
c.Providers[storageID].Spaces[spaceID].nextSync = time.Now().Add(c.ttl)
Expand All @@ -207,36 +229,41 @@ func (c *Cache) Sync(ctx context.Context, storageID, spaceID string) error {
info, err := c.storage.Stat(ctx, jsonPath)
if err != nil {
if _, ok := err.(errtypes.NotFound); ok {
log.Debug().Msg("no json file, nothing to sync")
span.AddEvent("no file")
span.SetStatus(codes.Ok, "")
return nil // Nothing to sync against
}
if _, ok := err.(*os.PathError); ok {
log.Debug().Msg("no storage dir, nothing to sync")
span.AddEvent("no dir")
span.SetStatus(codes.Ok, "")
return nil // Nothing to sync against
}
span.SetStatus(codes.Error, fmt.Sprintf("Failed to stat the provider cache: %s", err.Error()))
log.Error().Err(err).Msg("Failed to stat the provider cache")
return err
}
// check mtime of /users/{userid}/created.json
if utils.TSToTime(info.Mtime).After(mtime) {
log.Debug().Msg("Updating provider cache...")
span.AddEvent("updating cache")
// - update cached list of created shares for the user in memory if changed
createdBlob, err := c.storage.SimpleDownload(ctx, jsonPath)
if err != nil {
span.SetStatus(codes.Error, fmt.Sprintf("Failed to download the provider cache: %s", err.Error()))
log.Error().Err(err).Msg("Failed to download the provider cache")
return err
}
newShares := &Shares{}
err = json.Unmarshal(createdBlob, newShares)
if err != nil {
span.SetStatus(codes.Error, fmt.Sprintf("Failed to unmarshal the provider cache: %s", err.Error()))
log.Error().Err(err).Msg("Failed to unmarshal the provider cache")
return err
}
newShares.Mtime = utils.TSToTime(info.Mtime)
c.initializeIfNeeded(storageID, spaceID)
c.Providers[storageID].Spaces[spaceID] = newShares
}
log.Debug().Msg("Provider cache is up to date")
span.SetStatus(codes.Ok, "")
return nil
}

Expand Down
33 changes: 28 additions & 5 deletions pkg/share/manager/jsoncs3/receivedsharecache/receivedsharecache.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package receivedsharecache
import (
"context"
"encoding/json"
"fmt"
"path"
"path/filepath"
"time"
Expand All @@ -31,8 +32,13 @@ import (
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/storage/utils/metadata"
"github.com/cs3org/reva/v2/pkg/utils"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
)

// name is the Tracer name used to identify this instrumentation library.
const tracerName = "receivedsharecache"

// Cache stores the list of received shares and their states
// It functions as an in-memory cache with a persistence layer
// The storage is sharded by user
Expand Down Expand Up @@ -74,6 +80,10 @@ func New(s metadata.Storage, ttl time.Duration) Cache {

// Add adds a new entry to the cache
func (c *Cache) Add(ctx context.Context, userID, spaceID string, rs *collaboration.ReceivedShare) error {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Add")
defer span.End()
span.SetAttributes(attribute.String("cs3.userid", userID), attribute.String("cs3.spaceid", spaceID))

if c.ReceivedSpaces[userID] == nil {
c.ReceivedSpaces[userID] = &Spaces{
Spaces: map[string]*Space{},
Expand Down Expand Up @@ -106,13 +116,17 @@ func (c *Cache) Get(userID, spaceID, shareID string) *State {

// Sync updates the in-memory data with the data from the storage if it is outdated
func (c *Cache) Sync(ctx context.Context, userID string) error {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Sync")
defer span.End()
span.SetAttributes(attribute.String("cs3.userid", userID))

log := appctx.GetLogger(ctx).With().Str("userID", userID).Logger()
log.Debug().Msg("Syncing received share cache...")

var mtime time.Time
if c.ReceivedSpaces[userID] != nil {
if time.Now().Before(c.ReceivedSpaces[userID].nextSync) {
log.Debug().Msg("Skipping received share cache sync, it was just recently synced...")
span.AddEvent("skip sync")
span.SetStatus(codes.Ok, "")
return nil
}
c.ReceivedSpaces[userID].nextSync = time.Now().Add(c.ttl)
Expand All @@ -123,38 +137,47 @@ func (c *Cache) Sync(ctx context.Context, userID string) error {
}

jsonPath := userJSONPath(userID)
info, err := c.storage.Stat(ctx, jsonPath)
info, err := c.storage.Stat(ctx, jsonPath) // TODO we only need the mtime ... use fieldmask to make the request cheaper
if err != nil {
if _, ok := err.(errtypes.NotFound); ok {
span.AddEvent("no file")
span.SetStatus(codes.Ok, "")
return nil // Nothing to sync against
}
span.SetStatus(codes.Error, fmt.Sprintf("Failed to stat the received share: %s", err.Error()))
log.Error().Err(err).Msg("Failed to stat the received share")
return err
}
// check mtime of /users/{userid}/created.json
if utils.TSToTime(info.Mtime).After(mtime) {
log.Debug().Msg("Updating received share cache...")
span.AddEvent("updating cache")
// - update cached list of created shares for the user in memory if changed
createdBlob, err := c.storage.SimpleDownload(ctx, jsonPath)
if err != nil {
span.SetStatus(codes.Error, fmt.Sprintf("Failed to download the received share: %s", err.Error()))
log.Error().Err(err).Msg("Failed to download the received share")
return err
}
newSpaces := &Spaces{}
err = json.Unmarshal(createdBlob, newSpaces)
if err != nil {
span.SetStatus(codes.Error, fmt.Sprintf("Failed to unmarshal the received share: %s", err.Error()))
log.Error().Err(err).Msg("Failed to unmarshal the received share")
return err
}
newSpaces.Mtime = utils.TSToTime(info.Mtime)
c.ReceivedSpaces[userID] = newSpaces
}
log.Debug().Msg("Received share cache is up to date")
span.SetStatus(codes.Ok, "")
return nil
}

// Persist persists the data for one user to the storage
func (c *Cache) Persist(ctx context.Context, userID string) error {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Persist")
defer span.End()
span.SetAttributes(attribute.String("cs3.userid", userID))

if c.ReceivedSpaces[userID] == nil {
return nil
}
Expand Down
35 changes: 31 additions & 4 deletions pkg/share/manager/jsoncs3/sharecache/sharecache.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package sharecache
import (
"context"
"encoding/json"
"fmt"
"path"
"path/filepath"
"time"
Expand All @@ -30,8 +31,13 @@ import (
"github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/shareid"
"github.com/cs3org/reva/v2/pkg/storage/utils/metadata"
"github.com/cs3org/reva/v2/pkg/utils"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
)

// name is the Tracer name used to identify this instrumentation library.
const tracerName = "sharecache"

// Cache caches the list of share ids for users/groups
// It functions as an in-memory cache with a persistence layer
// The storage is sharded by user/group
Expand Down Expand Up @@ -71,6 +77,10 @@ func New(s metadata.Storage, namespace, filename string, ttl time.Duration) Cach

// Add adds a share to the cache
func (c *Cache) Add(ctx context.Context, userid, shareID string) error {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Add")
defer span.End()
span.SetAttributes(attribute.String("cs3.userid", userid), attribute.String("cs3.shareid", shareID))

storageid, spaceid, _ := shareid.Decode(shareID)
ssid := storageid + shareid.IDDelimiter + spaceid

Expand All @@ -94,6 +104,10 @@ func (c *Cache) Add(ctx context.Context, userid, shareID string) error {

// Remove removes a share for the given user
func (c *Cache) Remove(ctx context.Context, userid, shareID string) error {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Remove")
defer span.End()
span.SetAttributes(attribute.String("cs3.userid", userid), attribute.String("cs3.shareid", shareID))

storageid, spaceid, _ := shareid.Decode(shareID)
ssid := storageid + shareid.IDDelimiter + spaceid

Expand Down Expand Up @@ -133,14 +147,18 @@ func (c *Cache) List(userid string) map[string]SpaceShareIDs {

// Sync updates the in-memory data with the data from the storage if it is outdated
func (c *Cache) Sync(ctx context.Context, userID string) error {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Sync")
defer span.End()
span.SetAttributes(attribute.String("cs3.userid", userID))

log := appctx.GetLogger(ctx).With().Str("userID", userID).Logger()
log.Debug().Msg("Syncing share cache...")

var mtime time.Time
// - do we have a cached list of created shares for the user in memory?
if usc := c.UserShares[userID]; usc != nil {
if time.Now().Before(c.UserShares[userID].nextSync) {
log.Debug().Msg("Skipping share cache sync, it was just recently synced...")
span.AddEvent("skip sync")
span.SetStatus(codes.Ok, "")
return nil
}
c.UserShares[userID].nextSync = time.Now().Add(c.ttl)
Expand All @@ -155,35 +173,44 @@ func (c *Cache) Sync(ctx context.Context, userID string) error {
info, err := c.storage.Stat(ctx, userCreatedPath)
if err != nil {
if _, ok := err.(errtypes.NotFound); ok {
span.AddEvent("no file")
span.SetStatus(codes.Ok, "")
return nil // Nothing to sync against
}
span.SetStatus(codes.Error, fmt.Sprintf("Failed to stat the share cache: %s", err.Error()))
log.Error().Err(err).Msg("Failed to stat the share cache")
return err
}
// check mtime of /users/{userid}/created.json
if utils.TSToTime(info.Mtime).After(mtime) {
log.Debug().Msg("Updating share cache...")
span.AddEvent("updating cache")
// - update cached list of created shares for the user in memory if changed
createdBlob, err := c.storage.SimpleDownload(ctx, userCreatedPath)
if err != nil {
span.SetStatus(codes.Error, fmt.Sprintf("Failed to download the share cache: %s", err.Error()))
log.Error().Err(err).Msg("Failed to download the share cache")
return err
}
newShareCache := &UserShareCache{}
err = json.Unmarshal(createdBlob, newShareCache)
if err != nil {
span.SetStatus(codes.Error, fmt.Sprintf("Failed to unmarshal the share cache: %s", err.Error()))
log.Error().Err(err).Msg("Failed to unmarshal the share cache")
return err
}
newShareCache.Mtime = utils.TSToTime(info.Mtime)
c.UserShares[userID] = newShareCache
}
log.Debug().Msg("Share cache is up to date")
span.SetStatus(codes.Ok, "")
return nil
}

// Persist persists the data for one user/group to the storage
func (c *Cache) Persist(ctx context.Context, userid string) error {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Persist")
defer span.End()
span.SetAttributes(attribute.String("cs3.userid", userid))

oldMtime := c.UserShares[userid].Mtime
c.UserShares[userid].Mtime = time.Now()

Expand Down
Loading

0 comments on commit 9ecbfcb

Please sign in to comment.