From 9ecbfcb97babbbd38c3672f90362451b8984685c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Mon, 5 Jun 2023 00:18:38 +0200 Subject: [PATCH] trace jsoncs3 caches MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- .../jsoncs3/providercache/providercache.go | 39 ++++++++++++++++--- .../receivedsharecache/receivedsharecache.go | 33 +++++++++++++--- .../manager/jsoncs3/sharecache/sharecache.go | 35 +++++++++++++++-- pkg/storage/utils/metadata/cs3.go | 37 ++++++++++++++++++ 4 files changed, 129 insertions(+), 15 deletions(-) diff --git a/pkg/share/manager/jsoncs3/providercache/providercache.go b/pkg/share/manager/jsoncs3/providercache/providercache.go index 528922ac49..2fc4ed1826 100644 --- a/pkg/share/manager/jsoncs3/providercache/providercache.go +++ b/pkg/share/manager/jsoncs3/providercache/providercache.go @@ -33,8 +33,13 @@ import ( "github.com/cs3org/reva/v2/pkg/errtypes" "github.com/cs3org/reva/v2/pkg/storage/utils/metadata" "github.com/cs3org/reva/v2/pkg/utils" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" ) +// name is the Tracer name used to identify this instrumentation library. +const tracerName = "providercache" + // Cache holds share information structured by provider and space type Cache struct { Providers map[string]*Spaces @@ -106,6 +111,10 @@ func New(s metadata.Storage, ttl time.Duration) Cache { // Add adds a share to the cache func (c *Cache) Add(ctx context.Context, storageID, spaceID, shareID string, share *collaboration.Share) error { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Add") + defer span.End() + span.SetAttributes(attribute.String("cs3.storageid", storageID), attribute.String("cs3.spaceid", spaceID), attribute.String("cs3.shareid", shareID)) + switch { case storageID == "": return fmt.Errorf("missing storage id") @@ -122,6 +131,10 @@ func (c *Cache) Add(ctx context.Context, storageID, spaceID, shareID string, sha // Remove removes a share from the cache func (c *Cache) Remove(ctx context.Context, storageID, spaceID, shareID string) error { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Remove") + defer span.End() + span.SetAttributes(attribute.String("cs3.storageid", storageID), attribute.String("cs3.spaceid", spaceID), attribute.String("cs3.shareid", shareID)) + if c.Providers[storageID] == nil || c.Providers[storageID].Spaces[spaceID] == nil { return nil @@ -150,6 +163,10 @@ func (c *Cache) ListSpace(storageID, spaceID string) *Shares { // PersistWithTime persists the data of one space if it has not been modified since the given mtime func (c *Cache) PersistWithTime(ctx context.Context, storageID, spaceID string, mtime time.Time) error { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "PersistWithTime") + defer span.End() + span.SetAttributes(attribute.String("cs3.storageid", storageID), attribute.String("cs3.spaceid", spaceID)) + if c.Providers[storageID] == nil || c.Providers[storageID].Spaces[spaceID] == nil { return nil } @@ -187,15 +204,20 @@ func (c *Cache) Persist(ctx context.Context, storageID, spaceID string) error { // Sync updates the in-memory data with the data from the storage if it is outdated func (c *Cache) Sync(ctx context.Context, storageID, spaceID string) error { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Sync") + defer span.End() + + span.SetAttributes(attribute.String("cs3.storageid", storageID), attribute.String("cs3.spaceid", spaceID)) + log := appctx.GetLogger(ctx).With().Str("storageID", storageID).Str("spaceID", spaceID).Logger() - log.Debug().Msg("Syncing provider cache...") var mtime time.Time if c.Providers[storageID] != nil && c.Providers[storageID].Spaces[spaceID] != nil { mtime = c.Providers[storageID].Spaces[spaceID].Mtime if time.Now().Before(c.Providers[storageID].Spaces[spaceID].nextSync) { - log.Debug().Msg("Skipping provider cache sync, it was just recently synced...") + span.AddEvent("skip sync") + span.SetStatus(codes.Ok, "") return nil } c.Providers[storageID].Spaces[spaceID].nextSync = time.Now().Add(c.ttl) @@ -207,28 +229,33 @@ func (c *Cache) Sync(ctx context.Context, storageID, spaceID string) error { info, err := c.storage.Stat(ctx, jsonPath) if err != nil { if _, ok := err.(errtypes.NotFound); ok { - log.Debug().Msg("no json file, nothing to sync") + span.AddEvent("no file") + span.SetStatus(codes.Ok, "") return nil // Nothing to sync against } if _, ok := err.(*os.PathError); ok { - log.Debug().Msg("no storage dir, nothing to sync") + span.AddEvent("no dir") + span.SetStatus(codes.Ok, "") return nil // Nothing to sync against } + span.SetStatus(codes.Error, fmt.Sprintf("Failed to stat the provider cache: %s", err.Error())) log.Error().Err(err).Msg("Failed to stat the provider cache") return err } // check mtime of /users/{userid}/created.json if utils.TSToTime(info.Mtime).After(mtime) { - log.Debug().Msg("Updating provider cache...") + span.AddEvent("updating cache") // - update cached list of created shares for the user in memory if changed createdBlob, err := c.storage.SimpleDownload(ctx, jsonPath) if err != nil { + span.SetStatus(codes.Error, fmt.Sprintf("Failed to download the provider cache: %s", err.Error())) log.Error().Err(err).Msg("Failed to download the provider cache") return err } newShares := &Shares{} err = json.Unmarshal(createdBlob, newShares) if err != nil { + span.SetStatus(codes.Error, fmt.Sprintf("Failed to unmarshal the provider cache: %s", err.Error())) log.Error().Err(err).Msg("Failed to unmarshal the provider cache") return err } @@ -236,7 +263,7 @@ func (c *Cache) Sync(ctx context.Context, storageID, spaceID string) error { c.initializeIfNeeded(storageID, spaceID) c.Providers[storageID].Spaces[spaceID] = newShares } - log.Debug().Msg("Provider cache is up to date") + span.SetStatus(codes.Ok, "") return nil } diff --git a/pkg/share/manager/jsoncs3/receivedsharecache/receivedsharecache.go b/pkg/share/manager/jsoncs3/receivedsharecache/receivedsharecache.go index 0693b802ce..c58e761d86 100644 --- a/pkg/share/manager/jsoncs3/receivedsharecache/receivedsharecache.go +++ b/pkg/share/manager/jsoncs3/receivedsharecache/receivedsharecache.go @@ -21,6 +21,7 @@ package receivedsharecache import ( "context" "encoding/json" + "fmt" "path" "path/filepath" "time" @@ -31,8 +32,13 @@ import ( "github.com/cs3org/reva/v2/pkg/errtypes" "github.com/cs3org/reva/v2/pkg/storage/utils/metadata" "github.com/cs3org/reva/v2/pkg/utils" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" ) +// name is the Tracer name used to identify this instrumentation library. +const tracerName = "receivedsharecache" + // Cache stores the list of received shares and their states // It functions as an in-memory cache with a persistence layer // The storage is sharded by user @@ -74,6 +80,10 @@ func New(s metadata.Storage, ttl time.Duration) Cache { // Add adds a new entry to the cache func (c *Cache) Add(ctx context.Context, userID, spaceID string, rs *collaboration.ReceivedShare) error { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Add") + defer span.End() + span.SetAttributes(attribute.String("cs3.userid", userID), attribute.String("cs3.spaceid", spaceID)) + if c.ReceivedSpaces[userID] == nil { c.ReceivedSpaces[userID] = &Spaces{ Spaces: map[string]*Space{}, @@ -106,13 +116,17 @@ func (c *Cache) Get(userID, spaceID, shareID string) *State { // Sync updates the in-memory data with the data from the storage if it is outdated func (c *Cache) Sync(ctx context.Context, userID string) error { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Sync") + defer span.End() + span.SetAttributes(attribute.String("cs3.userid", userID)) + log := appctx.GetLogger(ctx).With().Str("userID", userID).Logger() - log.Debug().Msg("Syncing received share cache...") var mtime time.Time if c.ReceivedSpaces[userID] != nil { if time.Now().Before(c.ReceivedSpaces[userID].nextSync) { - log.Debug().Msg("Skipping received share cache sync, it was just recently synced...") + span.AddEvent("skip sync") + span.SetStatus(codes.Ok, "") return nil } c.ReceivedSpaces[userID].nextSync = time.Now().Add(c.ttl) @@ -123,38 +137,47 @@ func (c *Cache) Sync(ctx context.Context, userID string) error { } jsonPath := userJSONPath(userID) - info, err := c.storage.Stat(ctx, jsonPath) + info, err := c.storage.Stat(ctx, jsonPath) // TODO we only need the mtime ... use fieldmask to make the request cheaper if err != nil { if _, ok := err.(errtypes.NotFound); ok { + span.AddEvent("no file") + span.SetStatus(codes.Ok, "") return nil // Nothing to sync against } + span.SetStatus(codes.Error, fmt.Sprintf("Failed to stat the received share: %s", err.Error())) log.Error().Err(err).Msg("Failed to stat the received share") return err } // check mtime of /users/{userid}/created.json if utils.TSToTime(info.Mtime).After(mtime) { - log.Debug().Msg("Updating received share cache...") + span.AddEvent("updating cache") // - update cached list of created shares for the user in memory if changed createdBlob, err := c.storage.SimpleDownload(ctx, jsonPath) if err != nil { + span.SetStatus(codes.Error, fmt.Sprintf("Failed to download the received share: %s", err.Error())) log.Error().Err(err).Msg("Failed to download the received share") return err } newSpaces := &Spaces{} err = json.Unmarshal(createdBlob, newSpaces) if err != nil { + span.SetStatus(codes.Error, fmt.Sprintf("Failed to unmarshal the received share: %s", err.Error())) log.Error().Err(err).Msg("Failed to unmarshal the received share") return err } newSpaces.Mtime = utils.TSToTime(info.Mtime) c.ReceivedSpaces[userID] = newSpaces } - log.Debug().Msg("Received share cache is up to date") + span.SetStatus(codes.Ok, "") return nil } // Persist persists the data for one user to the storage func (c *Cache) Persist(ctx context.Context, userID string) error { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Persist") + defer span.End() + span.SetAttributes(attribute.String("cs3.userid", userID)) + if c.ReceivedSpaces[userID] == nil { return nil } diff --git a/pkg/share/manager/jsoncs3/sharecache/sharecache.go b/pkg/share/manager/jsoncs3/sharecache/sharecache.go index 7fdd7e392d..78ed5aacca 100644 --- a/pkg/share/manager/jsoncs3/sharecache/sharecache.go +++ b/pkg/share/manager/jsoncs3/sharecache/sharecache.go @@ -21,6 +21,7 @@ package sharecache import ( "context" "encoding/json" + "fmt" "path" "path/filepath" "time" @@ -30,8 +31,13 @@ import ( "github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/shareid" "github.com/cs3org/reva/v2/pkg/storage/utils/metadata" "github.com/cs3org/reva/v2/pkg/utils" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" ) +// name is the Tracer name used to identify this instrumentation library. +const tracerName = "sharecache" + // Cache caches the list of share ids for users/groups // It functions as an in-memory cache with a persistence layer // The storage is sharded by user/group @@ -71,6 +77,10 @@ func New(s metadata.Storage, namespace, filename string, ttl time.Duration) Cach // Add adds a share to the cache func (c *Cache) Add(ctx context.Context, userid, shareID string) error { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Add") + defer span.End() + span.SetAttributes(attribute.String("cs3.userid", userid), attribute.String("cs3.shareid", shareID)) + storageid, spaceid, _ := shareid.Decode(shareID) ssid := storageid + shareid.IDDelimiter + spaceid @@ -94,6 +104,10 @@ func (c *Cache) Add(ctx context.Context, userid, shareID string) error { // Remove removes a share for the given user func (c *Cache) Remove(ctx context.Context, userid, shareID string) error { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Remove") + defer span.End() + span.SetAttributes(attribute.String("cs3.userid", userid), attribute.String("cs3.shareid", shareID)) + storageid, spaceid, _ := shareid.Decode(shareID) ssid := storageid + shareid.IDDelimiter + spaceid @@ -133,14 +147,18 @@ func (c *Cache) List(userid string) map[string]SpaceShareIDs { // Sync updates the in-memory data with the data from the storage if it is outdated func (c *Cache) Sync(ctx context.Context, userID string) error { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Sync") + defer span.End() + span.SetAttributes(attribute.String("cs3.userid", userID)) + log := appctx.GetLogger(ctx).With().Str("userID", userID).Logger() - log.Debug().Msg("Syncing share cache...") var mtime time.Time // - do we have a cached list of created shares for the user in memory? if usc := c.UserShares[userID]; usc != nil { if time.Now().Before(c.UserShares[userID].nextSync) { - log.Debug().Msg("Skipping share cache sync, it was just recently synced...") + span.AddEvent("skip sync") + span.SetStatus(codes.Ok, "") return nil } c.UserShares[userID].nextSync = time.Now().Add(c.ttl) @@ -155,35 +173,44 @@ func (c *Cache) Sync(ctx context.Context, userID string) error { info, err := c.storage.Stat(ctx, userCreatedPath) if err != nil { if _, ok := err.(errtypes.NotFound); ok { + span.AddEvent("no file") + span.SetStatus(codes.Ok, "") return nil // Nothing to sync against } + span.SetStatus(codes.Error, fmt.Sprintf("Failed to stat the share cache: %s", err.Error())) log.Error().Err(err).Msg("Failed to stat the share cache") return err } // check mtime of /users/{userid}/created.json if utils.TSToTime(info.Mtime).After(mtime) { - log.Debug().Msg("Updating share cache...") + span.AddEvent("updating cache") // - update cached list of created shares for the user in memory if changed createdBlob, err := c.storage.SimpleDownload(ctx, userCreatedPath) if err != nil { + span.SetStatus(codes.Error, fmt.Sprintf("Failed to download the share cache: %s", err.Error())) log.Error().Err(err).Msg("Failed to download the share cache") return err } newShareCache := &UserShareCache{} err = json.Unmarshal(createdBlob, newShareCache) if err != nil { + span.SetStatus(codes.Error, fmt.Sprintf("Failed to unmarshal the share cache: %s", err.Error())) log.Error().Err(err).Msg("Failed to unmarshal the share cache") return err } newShareCache.Mtime = utils.TSToTime(info.Mtime) c.UserShares[userID] = newShareCache } - log.Debug().Msg("Share cache is up to date") + span.SetStatus(codes.Ok, "") return nil } // Persist persists the data for one user/group to the storage func (c *Cache) Persist(ctx context.Context, userid string) error { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Persist") + defer span.End() + span.SetAttributes(attribute.String("cs3.userid", userid)) + oldMtime := c.UserShares[userid].Mtime c.UserShares[userid].Mtime = time.Now() diff --git a/pkg/storage/utils/metadata/cs3.go b/pkg/storage/utils/metadata/cs3.go index c6b1fda1b5..f78bca5979 100644 --- a/pkg/storage/utils/metadata/cs3.go +++ b/pkg/storage/utils/metadata/cs3.go @@ -32,6 +32,7 @@ import ( rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" types "github.com/cs3org/go-cs3apis/cs3/types/v1beta1" + "github.com/cs3org/reva/v2/pkg/appctx" ctxpkg "github.com/cs3org/reva/v2/pkg/ctx" "github.com/cs3org/reva/v2/pkg/errtypes" "github.com/cs3org/reva/v2/pkg/rgrpc/status" @@ -40,6 +41,9 @@ import ( "google.golang.org/grpc/metadata" ) +// name is the Tracer name used to identify this instrumentation library. +const tracerName = "metadata.cs3" + // CS3 represents a metadata storage with a cs3 storage backend type CS3 struct { providerSelector pool.Selectable[provider.ProviderAPIClient] @@ -123,6 +127,9 @@ func (cs3 *CS3) Init(ctx context.Context, spaceid string) (err error) { // SimpleUpload uploads a file to the metadata storage func (cs3 *CS3) SimpleUpload(ctx context.Context, uploadpath string, content []byte) error { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "SimpleUpload") + defer span.End() + return cs3.Upload(ctx, UploadRequest{ Path: uploadpath, Content: content, @@ -131,6 +138,9 @@ func (cs3 *CS3) SimpleUpload(ctx context.Context, uploadpath string, content []b // Upload uploads a file to the metadata storage func (cs3 *CS3) Upload(ctx context.Context, req UploadRequest) error { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Upload") + defer span.End() + client, err := cs3.providerSelector.Next() if err != nil { return err @@ -194,6 +204,9 @@ func (cs3 *CS3) Upload(ctx context.Context, req UploadRequest) error { // Stat returns the metadata for the given path func (cs3 *CS3) Stat(ctx context.Context, path string) (*provider.ResourceInfo, error) { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Stat") + defer span.End() + client, err := cs3.providerSelector.Next() if err != nil { return nil, err @@ -223,6 +236,9 @@ func (cs3 *CS3) Stat(ctx context.Context, path string) (*provider.ResourceInfo, // SimpleDownload reads a file from the metadata storage func (cs3 *CS3) SimpleDownload(ctx context.Context, downloadpath string) (content []byte, err error) { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "SimpleDownload") + defer span.End() + client, err := cs3.providerSelector.Next() if err != nil { return nil, err @@ -286,6 +302,9 @@ func (cs3 *CS3) SimpleDownload(ctx context.Context, downloadpath string) (conten // Delete deletes a path func (cs3 *CS3) Delete(ctx context.Context, path string) error { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Delete") + defer span.End() + client, err := cs3.providerSelector.Next() if err != nil { return err @@ -313,6 +332,9 @@ func (cs3 *CS3) Delete(ctx context.Context, path string) error { // ReadDir returns the entries in a given directory func (cs3 *CS3) ReadDir(ctx context.Context, path string) ([]string, error) { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "ReadDir") + defer span.End() + infos, err := cs3.ListDir(ctx, path) if err != nil { return nil, err @@ -327,6 +349,9 @@ func (cs3 *CS3) ReadDir(ctx context.Context, path string) ([]string, error) { // ListDir returns a list of ResourceInfos for the entries in a given directory func (cs3 *CS3) ListDir(ctx context.Context, path string) ([]*provider.ResourceInfo, error) { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "ListDir") + defer span.End() + client, err := cs3.providerSelector.Next() if err != nil { return nil, err @@ -356,6 +381,9 @@ func (cs3 *CS3) ListDir(ctx context.Context, path string) ([]*provider.ResourceI // MakeDirIfNotExist will create a root node in the metadata storage. Requires an authenticated context. func (cs3 *CS3) MakeDirIfNotExist(ctx context.Context, folder string) error { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "MakeDirIfNotExist") + defer span.End() + client, err := cs3.providerSelector.Next() if err != nil { return err @@ -404,6 +432,9 @@ func (cs3 *CS3) MakeDirIfNotExist(ctx context.Context, folder string) error { // CreateSymlink creates a symlink func (cs3 *CS3) CreateSymlink(ctx context.Context, oldname, newname string) error { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "CreateSymlink") + defer span.End() + if _, err := cs3.ResolveSymlink(ctx, newname); err == nil { return os.ErrExist } @@ -413,6 +444,9 @@ func (cs3 *CS3) CreateSymlink(ctx context.Context, oldname, newname string) erro // ResolveSymlink resolves a symlink func (cs3 *CS3) ResolveSymlink(ctx context.Context, name string) (string, error) { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "ResolveSymlink") + defer span.End() + b, err := cs3.SimpleDownload(ctx, name) if err != nil { if errors.Is(err, errtypes.NotFound("")) { @@ -425,6 +459,9 @@ func (cs3 *CS3) ResolveSymlink(ctx context.Context, name string) (string, error) } func (cs3 *CS3) getAuthContext(ctx context.Context) (context.Context, error) { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "getAuthContext") + defer span.End() + client, err := cs3.gatewaySelector.Next() if err != nil { return nil, err