Skip to content

Commit

Permalink
Add jsoncs3 tracing (#3941)
Browse files Browse the repository at this point in the history
* Add jsoncs3 tracing

This is extracted from commits 9fa3d4f and
9ecbfcb in the PR #3809

It didn't work to directly cherry-pick the commits.

* Add changelog entry for jsoncs3 tracing.

* Fix invalid context usage and correct changelog category.
  • Loading branch information
Daniel Swärd authored Jun 6, 2023
1 parent 0c2e409 commit 0dda564
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 16 deletions.
4 changes: 4 additions & 0 deletions changelog/unreleased/jsoncs3-tracing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Enhancement: Adding tracing for jsoncs3

https://github.com/cs3org/reva/pull/3941

37 changes: 37 additions & 0 deletions pkg/share/manager/jsoncs3/jsoncs3.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ import (
- if the mtime changed we download the file to update the local cache
*/

// name is the Tracer name used to identify this instrumentation library.
const tracerName = "jsoncs3"

func init() {
registry.Register("jsoncs3", NewDefault)
}
Expand Down Expand Up @@ -264,6 +267,8 @@ func (m *Manager) initialize() error {

// Share creates a new share
func (m *Manager) Share(ctx context.Context, md *provider.ResourceInfo, g *collaboration.ShareGrant) (*collaboration.Share, error) {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Share")
defer span.End()
if err := m.initialize(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -418,6 +423,8 @@ func (m *Manager) get(ctx context.Context, ref *collaboration.ShareReference) (s

// GetShare gets the information for a share by the given ref.
func (m *Manager) GetShare(ctx context.Context, ref *collaboration.ShareReference) (*collaboration.Share, error) {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "GetShare")
defer span.End()
if err := m.initialize(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -468,6 +475,9 @@ func (m *Manager) GetShare(ctx context.Context, ref *collaboration.ShareReferenc

// Unshare deletes a share
func (m *Manager) Unshare(ctx context.Context, ref *collaboration.ShareReference) error {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Unshare")
defer span.End()

if err := m.initialize(); err != nil {
return err
}
Expand All @@ -491,6 +501,9 @@ func (m *Manager) Unshare(ctx context.Context, ref *collaboration.ShareReference

// UpdateShare updates the mode of the given share.
func (m *Manager) UpdateShare(ctx context.Context, ref *collaboration.ShareReference, p *collaboration.SharePermissions, updated *collaboration.Share, fieldMask *field_mask.FieldMask) (*collaboration.Share, error) {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "UpdateShare")
defer span.End()

if err := m.initialize(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -570,6 +583,9 @@ func (m *Manager) UpdateShare(ctx context.Context, ref *collaboration.ShareRefer

// ListShares returns the shares created by the user
func (m *Manager) ListShares(ctx context.Context, filters []*collaboration.Filter) ([]*collaboration.Share, error) {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "ListShares")
defer span.End()

if err := m.initialize(); err != nil {
return nil, err
}
Expand All @@ -587,6 +603,9 @@ func (m *Manager) ListShares(ctx context.Context, filters []*collaboration.Filte
}

func (m *Manager) listSharesByIDs(ctx context.Context, user *userv1beta1.User, filters []*collaboration.Filter) ([]*collaboration.Share, error) {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "listSharesByIDs")
defer span.End()

providerSpaces := make(map[string]map[string]struct{})
for _, f := range share.FilterFiltersByType(filters, collaboration.Filter_TYPE_RESOURCE_ID) {
storageID := f.GetResourceId().GetStorageId()
Expand Down Expand Up @@ -654,6 +673,9 @@ func (m *Manager) listSharesByIDs(ctx context.Context, user *userv1beta1.User, f
}

func (m *Manager) listCreatedShares(ctx context.Context, user *userv1beta1.User, filters []*collaboration.Filter) ([]*collaboration.Share, error) {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "listCreatedShares")
defer span.End()

var ss []*collaboration.Share

if err := m.CreatedCache.Sync(ctx, user.Id.OpaqueId); err != nil {
Expand Down Expand Up @@ -701,6 +723,9 @@ func (m *Manager) listCreatedShares(ctx context.Context, user *userv1beta1.User,

// ListReceivedShares returns the list of shares the user has access to.
func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaboration.Filter) ([]*collaboration.ReceivedShare, error) {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "ListReceivedShares")
defer span.End()

if err := m.initialize(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -853,6 +878,9 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati

// convert must be called in a lock-controlled block.
func (m *Manager) convert(ctx context.Context, userID string, s *collaboration.Share) *collaboration.ReceivedShare {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "convert")
defer span.End()

rs := &collaboration.ReceivedShare{
Share: s,
State: collaboration.ShareState_SHARE_STATE_PENDING,
Expand All @@ -879,6 +907,9 @@ func (m *Manager) GetReceivedShare(ctx context.Context, ref *collaboration.Share
}

func (m *Manager) getReceived(ctx context.Context, ref *collaboration.ShareReference) (*collaboration.ReceivedShare, error) {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "getReceived")
defer span.End()

m.Lock()
defer m.Unlock()
s, err := m.get(ctx, ref)
Expand Down Expand Up @@ -910,6 +941,9 @@ func (m *Manager) getReceived(ctx context.Context, ref *collaboration.ShareRefer

// UpdateReceivedShare updates the received share with share state.
func (m *Manager) UpdateReceivedShare(ctx context.Context, receivedShare *collaboration.ReceivedShare, fieldMask *field_mask.FieldMask) (*collaboration.ReceivedShare, error) {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "UpdateReceivedShare")
defer span.End()

if err := m.initialize(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -1020,6 +1054,9 @@ func (m *Manager) Load(ctx context.Context, shareChan <-chan *collaboration.Shar
}

func (m *Manager) removeShare(ctx context.Context, s *collaboration.Share) error {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "removeShare")
defer span.End()

storageID, spaceID, _ := shareid.Decode(s.Id.OpaqueId)
err := m.Cache.Remove(ctx, storageID, spaceID, s.Id.OpaqueId)
if _, ok := err.(errtypes.IsPreconditionFailed); ok {
Expand Down
39 changes: 33 additions & 6 deletions pkg/share/manager/jsoncs3/providercache/providercache.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,13 @@ import (
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/storage/utils/metadata"
"github.com/cs3org/reva/v2/pkg/utils"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
)

// name is the Tracer name used to identify this instrumentation library.
const tracerName = "providercache"

// Cache holds share information structured by provider and space
type Cache struct {
Providers map[string]*Spaces
Expand Down Expand Up @@ -106,6 +111,10 @@ func New(s metadata.Storage, ttl time.Duration) Cache {

// Add adds a share to the cache
func (c *Cache) Add(ctx context.Context, storageID, spaceID, shareID string, share *collaboration.Share) error {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Add")
defer span.End()
span.SetAttributes(attribute.String("cs3.storageid", storageID), attribute.String("cs3.spaceid", spaceID), attribute.String("cs3.shareid", shareID))

switch {
case storageID == "":
return fmt.Errorf("missing storage id")
Expand All @@ -122,6 +131,10 @@ func (c *Cache) Add(ctx context.Context, storageID, spaceID, shareID string, sha

// Remove removes a share from the cache
func (c *Cache) Remove(ctx context.Context, storageID, spaceID, shareID string) error {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Remove")
defer span.End()
span.SetAttributes(attribute.String("cs3.storageid", storageID), attribute.String("cs3.spaceid", spaceID), attribute.String("cs3.shareid", shareID))

if c.Providers[storageID] == nil ||
c.Providers[storageID].Spaces[spaceID] == nil {
return nil
Expand Down Expand Up @@ -150,6 +163,10 @@ func (c *Cache) ListSpace(storageID, spaceID string) *Shares {

// PersistWithTime persists the data of one space if it has not been modified since the given mtime
func (c *Cache) PersistWithTime(ctx context.Context, storageID, spaceID string, mtime time.Time) error {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "PersistWithTime")
defer span.End()
span.SetAttributes(attribute.String("cs3.storageid", storageID), attribute.String("cs3.spaceid", spaceID))

if c.Providers[storageID] == nil || c.Providers[storageID].Spaces[spaceID] == nil {
return nil
}
Expand Down Expand Up @@ -187,15 +204,20 @@ func (c *Cache) Persist(ctx context.Context, storageID, spaceID string) error {

// Sync updates the in-memory data with the data from the storage if it is outdated
func (c *Cache) Sync(ctx context.Context, storageID, spaceID string) error {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Sync")
defer span.End()

span.SetAttributes(attribute.String("cs3.storageid", storageID), attribute.String("cs3.spaceid", spaceID))

log := appctx.GetLogger(ctx).With().Str("storageID", storageID).Str("spaceID", spaceID).Logger()
log.Debug().Msg("Syncing provider cache...")

var mtime time.Time
if c.Providers[storageID] != nil && c.Providers[storageID].Spaces[spaceID] != nil {
mtime = c.Providers[storageID].Spaces[spaceID].Mtime

if time.Now().Before(c.Providers[storageID].Spaces[spaceID].nextSync) {
log.Debug().Msg("Skipping provider cache sync, it was just recently synced...")
span.AddEvent("skip sync")
span.SetStatus(codes.Ok, "")
return nil
}
c.Providers[storageID].Spaces[spaceID].nextSync = time.Now().Add(c.ttl)
Expand All @@ -207,36 +229,41 @@ func (c *Cache) Sync(ctx context.Context, storageID, spaceID string) error {
info, err := c.storage.Stat(ctx, jsonPath)
if err != nil {
if _, ok := err.(errtypes.NotFound); ok {
log.Debug().Msg("no json file, nothing to sync")
span.AddEvent("no file")
span.SetStatus(codes.Ok, "")
return nil // Nothing to sync against
}
if _, ok := err.(*os.PathError); ok {
log.Debug().Msg("no storage dir, nothing to sync")
span.AddEvent("no dir")
span.SetStatus(codes.Ok, "")
return nil // Nothing to sync against
}
span.SetStatus(codes.Error, fmt.Sprintf("Failed to stat the provider cache: %s", err.Error()))
log.Error().Err(err).Msg("Failed to stat the provider cache")
return err
}
// check mtime of /users/{userid}/created.json
if utils.TSToTime(info.Mtime).After(mtime) {
log.Debug().Msg("Updating provider cache...")
span.AddEvent("updating cache")
// - update cached list of created shares for the user in memory if changed
createdBlob, err := c.storage.SimpleDownload(ctx, jsonPath)
if err != nil {
span.SetStatus(codes.Error, fmt.Sprintf("Failed to download the provider cache: %s", err.Error()))
log.Error().Err(err).Msg("Failed to download the provider cache")
return err
}
newShares := &Shares{}
err = json.Unmarshal(createdBlob, newShares)
if err != nil {
span.SetStatus(codes.Error, fmt.Sprintf("Failed to unmarshal the provider cache: %s", err.Error()))
log.Error().Err(err).Msg("Failed to unmarshal the provider cache")
return err
}
newShares.Mtime = utils.TSToTime(info.Mtime)
c.initializeIfNeeded(storageID, spaceID)
c.Providers[storageID].Spaces[spaceID] = newShares
}
log.Debug().Msg("Provider cache is up to date")
span.SetStatus(codes.Ok, "")
return nil
}

Expand Down
33 changes: 28 additions & 5 deletions pkg/share/manager/jsoncs3/receivedsharecache/receivedsharecache.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package receivedsharecache
import (
"context"
"encoding/json"
"fmt"
"path"
"path/filepath"
"time"
Expand All @@ -31,8 +32,13 @@ import (
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/storage/utils/metadata"
"github.com/cs3org/reva/v2/pkg/utils"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
)

// name is the Tracer name used to identify this instrumentation library.
const tracerName = "receivedsharecache"

// Cache stores the list of received shares and their states
// It functions as an in-memory cache with a persistence layer
// The storage is sharded by user
Expand Down Expand Up @@ -74,6 +80,10 @@ func New(s metadata.Storage, ttl time.Duration) Cache {

// Add adds a new entry to the cache
func (c *Cache) Add(ctx context.Context, userID, spaceID string, rs *collaboration.ReceivedShare) error {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Add")
defer span.End()
span.SetAttributes(attribute.String("cs3.userid", userID), attribute.String("cs3.spaceid", spaceID))

if c.ReceivedSpaces[userID] == nil {
c.ReceivedSpaces[userID] = &Spaces{
Spaces: map[string]*Space{},
Expand Down Expand Up @@ -106,13 +116,17 @@ func (c *Cache) Get(userID, spaceID, shareID string) *State {

// Sync updates the in-memory data with the data from the storage if it is outdated
func (c *Cache) Sync(ctx context.Context, userID string) error {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Sync")
defer span.End()
span.SetAttributes(attribute.String("cs3.userid", userID))

log := appctx.GetLogger(ctx).With().Str("userID", userID).Logger()
log.Debug().Msg("Syncing received share cache...")

var mtime time.Time
if c.ReceivedSpaces[userID] != nil {
if time.Now().Before(c.ReceivedSpaces[userID].nextSync) {
log.Debug().Msg("Skipping received share cache sync, it was just recently synced...")
span.AddEvent("skip sync")
span.SetStatus(codes.Ok, "")
return nil
}
c.ReceivedSpaces[userID].nextSync = time.Now().Add(c.ttl)
Expand All @@ -123,38 +137,47 @@ func (c *Cache) Sync(ctx context.Context, userID string) error {
}

jsonPath := userJSONPath(userID)
info, err := c.storage.Stat(ctx, jsonPath)
info, err := c.storage.Stat(ctx, jsonPath) // TODO we only need the mtime ... use fieldmask to make the request cheaper
if err != nil {
if _, ok := err.(errtypes.NotFound); ok {
span.AddEvent("no file")
span.SetStatus(codes.Ok, "")
return nil // Nothing to sync against
}
span.SetStatus(codes.Error, fmt.Sprintf("Failed to stat the received share: %s", err.Error()))
log.Error().Err(err).Msg("Failed to stat the received share")
return err
}
// check mtime of /users/{userid}/created.json
if utils.TSToTime(info.Mtime).After(mtime) {
log.Debug().Msg("Updating received share cache...")
span.AddEvent("updating cache")
// - update cached list of created shares for the user in memory if changed
createdBlob, err := c.storage.SimpleDownload(ctx, jsonPath)
if err != nil {
span.SetStatus(codes.Error, fmt.Sprintf("Failed to download the received share: %s", err.Error()))
log.Error().Err(err).Msg("Failed to download the received share")
return err
}
newSpaces := &Spaces{}
err = json.Unmarshal(createdBlob, newSpaces)
if err != nil {
span.SetStatus(codes.Error, fmt.Sprintf("Failed to unmarshal the received share: %s", err.Error()))
log.Error().Err(err).Msg("Failed to unmarshal the received share")
return err
}
newSpaces.Mtime = utils.TSToTime(info.Mtime)
c.ReceivedSpaces[userID] = newSpaces
}
log.Debug().Msg("Received share cache is up to date")
span.SetStatus(codes.Ok, "")
return nil
}

// Persist persists the data for one user to the storage
func (c *Cache) Persist(ctx context.Context, userID string) error {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Persist")
defer span.End()
span.SetAttributes(attribute.String("cs3.userid", userID))

if c.ReceivedSpaces[userID] == nil {
return nil
}
Expand Down
Loading

0 comments on commit 0dda564

Please sign in to comment.