Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add jsoncs3 tracing #3941

Merged
merged 3 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions changelog/unreleased/jsoncs3-tracing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Feature: Adding tracing for jsoncs3

https://github.com/cs3org/reva/pull/3941

37 changes: 37 additions & 0 deletions pkg/share/manager/jsoncs3/jsoncs3.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ import (
- if the mtime changed we download the file to update the local cache
*/

// name is the Tracer name used to identify this instrumentation library.
const tracerName = "jsoncs3"

func init() {
registry.Register("jsoncs3", NewDefault)
}
Expand Down Expand Up @@ -264,6 +267,8 @@ func (m *Manager) initialize() error {

// Share creates a new share
func (m *Manager) Share(ctx context.Context, md *provider.ResourceInfo, g *collaboration.ShareGrant) (*collaboration.Share, error) {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Share")
defer span.End()
if err := m.initialize(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -418,6 +423,8 @@ func (m *Manager) get(ctx context.Context, ref *collaboration.ShareReference) (s

// GetShare gets the information for a share by the given ref.
func (m *Manager) GetShare(ctx context.Context, ref *collaboration.ShareReference) (*collaboration.Share, error) {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "GetShare")
defer span.End()
if err := m.initialize(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -468,6 +475,9 @@ func (m *Manager) GetShare(ctx context.Context, ref *collaboration.ShareReferenc

// Unshare deletes a share
func (m *Manager) Unshare(ctx context.Context, ref *collaboration.ShareReference) error {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Unshare")
defer span.End()

if err := m.initialize(); err != nil {
return err
}
Expand All @@ -491,6 +501,9 @@ func (m *Manager) Unshare(ctx context.Context, ref *collaboration.ShareReference

// UpdateShare updates the mode of the given share.
func (m *Manager) UpdateShare(ctx context.Context, ref *collaboration.ShareReference, p *collaboration.SharePermissions, updated *collaboration.Share, fieldMask *field_mask.FieldMask) (*collaboration.Share, error) {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "UpdateShare")
defer span.End()

if err := m.initialize(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -570,6 +583,9 @@ func (m *Manager) UpdateShare(ctx context.Context, ref *collaboration.ShareRefer

// ListShares returns the shares created by the user
func (m *Manager) ListShares(ctx context.Context, filters []*collaboration.Filter) ([]*collaboration.Share, error) {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "ListShares")
defer span.End()

if err := m.initialize(); err != nil {
return nil, err
}
Expand All @@ -587,6 +603,9 @@ func (m *Manager) ListShares(ctx context.Context, filters []*collaboration.Filte
}

func (m *Manager) listSharesByIDs(ctx context.Context, user *userv1beta1.User, filters []*collaboration.Filter) ([]*collaboration.Share, error) {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "listSharesByIDs")
defer span.End()

providerSpaces := make(map[string]map[string]struct{})
for _, f := range share.FilterFiltersByType(filters, collaboration.Filter_TYPE_RESOURCE_ID) {
storageID := f.GetResourceId().GetStorageId()
Expand Down Expand Up @@ -654,6 +673,9 @@ func (m *Manager) listSharesByIDs(ctx context.Context, user *userv1beta1.User, f
}

func (m *Manager) listCreatedShares(ctx context.Context, user *userv1beta1.User, filters []*collaboration.Filter) ([]*collaboration.Share, error) {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "listCreatedShares")
defer span.End()

var ss []*collaboration.Share

if err := m.CreatedCache.Sync(ctx, user.Id.OpaqueId); err != nil {
Expand Down Expand Up @@ -701,6 +723,9 @@ func (m *Manager) listCreatedShares(ctx context.Context, user *userv1beta1.User,

// ListReceivedShares returns the list of shares the user has access to.
func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaboration.Filter) ([]*collaboration.ReceivedShare, error) {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "ListReceivedShares")
defer span.End()

if err := m.initialize(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -853,6 +878,9 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati

// convert must be called in a lock-controlled block.
func (m *Manager) convert(ctx context.Context, userID string, s *collaboration.Share) *collaboration.ReceivedShare {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "convert")
defer span.End()

rs := &collaboration.ReceivedShare{
Share: s,
State: collaboration.ShareState_SHARE_STATE_PENDING,
Expand All @@ -879,6 +907,9 @@ func (m *Manager) GetReceivedShare(ctx context.Context, ref *collaboration.Share
}

func (m *Manager) getReceived(ctx context.Context, ref *collaboration.ShareReference) (*collaboration.ReceivedShare, error) {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "getReceived")
defer span.End()

m.Lock()
defer m.Unlock()
s, err := m.get(ctx, ref)
Expand Down Expand Up @@ -910,6 +941,9 @@ func (m *Manager) getReceived(ctx context.Context, ref *collaboration.ShareRefer

// UpdateReceivedShare updates the received share with share state.
func (m *Manager) UpdateReceivedShare(ctx context.Context, receivedShare *collaboration.ReceivedShare, fieldMask *field_mask.FieldMask) (*collaboration.ReceivedShare, error) {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "UpdateReceivedShare")
defer span.End()

if err := m.initialize(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -1020,6 +1054,9 @@ func (m *Manager) Load(ctx context.Context, shareChan <-chan *collaboration.Shar
}

func (m *Manager) removeShare(ctx context.Context, s *collaboration.Share) error {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "removeShare")
defer span.End()

storageID, spaceID, _ := shareid.Decode(s.Id.OpaqueId)
err := m.Cache.Remove(ctx, storageID, spaceID, s.Id.OpaqueId)
if _, ok := err.(errtypes.IsPreconditionFailed); ok {
Expand Down
39 changes: 33 additions & 6 deletions pkg/share/manager/jsoncs3/providercache/providercache.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,13 @@ import (
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/storage/utils/metadata"
"github.com/cs3org/reva/v2/pkg/utils"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
)

// name is the Tracer name used to identify this instrumentation library.
const tracerName = "providercache"

// Cache holds share information structured by provider and space
type Cache struct {
Providers map[string]*Spaces
Expand Down Expand Up @@ -106,6 +111,10 @@ func New(s metadata.Storage, ttl time.Duration) Cache {

// Add adds a share to the cache
func (c *Cache) Add(ctx context.Context, storageID, spaceID, shareID string, share *collaboration.Share) error {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Add")
defer span.End()
span.SetAttributes(attribute.String("cs3.storageid", storageID), attribute.String("cs3.spaceid", spaceID), attribute.String("cs3.shareid", shareID))

switch {
case storageID == "":
return fmt.Errorf("missing storage id")
Expand All @@ -122,6 +131,10 @@ func (c *Cache) Add(ctx context.Context, storageID, spaceID, shareID string, sha

// Remove removes a share from the cache
func (c *Cache) Remove(ctx context.Context, storageID, spaceID, shareID string) error {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Remove")
defer span.End()
span.SetAttributes(attribute.String("cs3.storageid", storageID), attribute.String("cs3.spaceid", spaceID), attribute.String("cs3.shareid", shareID))

if c.Providers[storageID] == nil ||
c.Providers[storageID].Spaces[spaceID] == nil {
return nil
Expand Down Expand Up @@ -150,6 +163,10 @@ func (c *Cache) ListSpace(storageID, spaceID string) *Shares {

// PersistWithTime persists the data of one space if it has not been modified since the given mtime
func (c *Cache) PersistWithTime(ctx context.Context, storageID, spaceID string, mtime time.Time) error {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "PersistWithTime")
defer span.End()
span.SetAttributes(attribute.String("cs3.storageid", storageID), attribute.String("cs3.spaceid", spaceID))

if c.Providers[storageID] == nil || c.Providers[storageID].Spaces[spaceID] == nil {
return nil
}
Expand Down Expand Up @@ -187,15 +204,20 @@ func (c *Cache) Persist(ctx context.Context, storageID, spaceID string) error {

// Sync updates the in-memory data with the data from the storage if it is outdated
func (c *Cache) Sync(ctx context.Context, storageID, spaceID string) error {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Sync")
defer span.End()

span.SetAttributes(attribute.String("cs3.storageid", storageID), attribute.String("cs3.spaceid", spaceID))

log := appctx.GetLogger(ctx).With().Str("storageID", storageID).Str("spaceID", spaceID).Logger()
log.Debug().Msg("Syncing provider cache...")

var mtime time.Time
if c.Providers[storageID] != nil && c.Providers[storageID].Spaces[spaceID] != nil {
mtime = c.Providers[storageID].Spaces[spaceID].Mtime

if time.Now().Before(c.Providers[storageID].Spaces[spaceID].nextSync) {
log.Debug().Msg("Skipping provider cache sync, it was just recently synced...")
span.AddEvent("skip sync")
span.SetStatus(codes.Ok, "")
return nil
}
c.Providers[storageID].Spaces[spaceID].nextSync = time.Now().Add(c.ttl)
Expand All @@ -207,36 +229,41 @@ func (c *Cache) Sync(ctx context.Context, storageID, spaceID string) error {
info, err := c.storage.Stat(ctx, jsonPath)
if err != nil {
if _, ok := err.(errtypes.NotFound); ok {
log.Debug().Msg("no json file, nothing to sync")
span.AddEvent("no file")
span.SetStatus(codes.Ok, "")
return nil // Nothing to sync against
}
if _, ok := err.(*os.PathError); ok {
log.Debug().Msg("no storage dir, nothing to sync")
span.AddEvent("no dir")
span.SetStatus(codes.Ok, "")
return nil // Nothing to sync against
}
span.SetStatus(codes.Error, fmt.Sprintf("Failed to stat the provider cache: %s", err.Error()))
log.Error().Err(err).Msg("Failed to stat the provider cache")
return err
}
// check mtime of /users/{userid}/created.json
if utils.TSToTime(info.Mtime).After(mtime) {
log.Debug().Msg("Updating provider cache...")
span.AddEvent("updating cache")
// - update cached list of created shares for the user in memory if changed
createdBlob, err := c.storage.SimpleDownload(ctx, jsonPath)
if err != nil {
span.SetStatus(codes.Error, fmt.Sprintf("Failed to download the provider cache: %s", err.Error()))
log.Error().Err(err).Msg("Failed to download the provider cache")
return err
}
newShares := &Shares{}
err = json.Unmarshal(createdBlob, newShares)
if err != nil {
span.SetStatus(codes.Error, fmt.Sprintf("Failed to unmarshal the provider cache: %s", err.Error()))
log.Error().Err(err).Msg("Failed to unmarshal the provider cache")
return err
}
newShares.Mtime = utils.TSToTime(info.Mtime)
c.initializeIfNeeded(storageID, spaceID)
c.Providers[storageID].Spaces[spaceID] = newShares
}
log.Debug().Msg("Provider cache is up to date")
span.SetStatus(codes.Ok, "")
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package receivedsharecache
import (
"context"
"encoding/json"
"fmt"
"path"
"path/filepath"
"time"
Expand All @@ -31,8 +32,13 @@ import (
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/storage/utils/metadata"
"github.com/cs3org/reva/v2/pkg/utils"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
)

// name is the Tracer name used to identify this instrumentation library.
const tracerName = "receivedsharecache"

// Cache stores the list of received shares and their states
// It functions as an in-memory cache with a persistence layer
// The storage is sharded by user
Expand Down Expand Up @@ -74,6 +80,10 @@ func New(s metadata.Storage, ttl time.Duration) Cache {

// Add adds a new entry to the cache
func (c *Cache) Add(ctx context.Context, userID, spaceID string, rs *collaboration.ReceivedShare) error {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Add")
defer span.End()
span.SetAttributes(attribute.String("cs3.userid", userID), attribute.String("cs3.spaceid", spaceID))

if c.ReceivedSpaces[userID] == nil {
c.ReceivedSpaces[userID] = &Spaces{
Spaces: map[string]*Space{},
Expand Down Expand Up @@ -106,13 +116,17 @@ func (c *Cache) Get(userID, spaceID, shareID string) *State {

// Sync updates the in-memory data with the data from the storage if it is outdated
func (c *Cache) Sync(ctx context.Context, userID string) error {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Sync")
defer span.End()
span.SetAttributes(attribute.String("cs3.userid", userID))

log := appctx.GetLogger(ctx).With().Str("userID", userID).Logger()
log.Debug().Msg("Syncing received share cache...")

var mtime time.Time
if c.ReceivedSpaces[userID] != nil {
if time.Now().Before(c.ReceivedSpaces[userID].nextSync) {
log.Debug().Msg("Skipping received share cache sync, it was just recently synced...")
span.AddEvent("skip sync")
span.SetStatus(codes.Ok, "")
return nil
}
c.ReceivedSpaces[userID].nextSync = time.Now().Add(c.ttl)
Expand All @@ -123,38 +137,47 @@ func (c *Cache) Sync(ctx context.Context, userID string) error {
}

jsonPath := userJSONPath(userID)
info, err := c.storage.Stat(ctx, jsonPath)
info, err := c.storage.Stat(ctx, jsonPath) // TODO we only need the mtime ... use fieldmask to make the request cheaper
if err != nil {
if _, ok := err.(errtypes.NotFound); ok {
span.AddEvent("no file")
span.SetStatus(codes.Ok, "")
return nil // Nothing to sync against
}
span.SetStatus(codes.Error, fmt.Sprintf("Failed to stat the received share: %s", err.Error()))
log.Error().Err(err).Msg("Failed to stat the received share")
return err
}
// check mtime of /users/{userid}/created.json
if utils.TSToTime(info.Mtime).After(mtime) {
log.Debug().Msg("Updating received share cache...")
span.AddEvent("updating cache")
// - update cached list of created shares for the user in memory if changed
createdBlob, err := c.storage.SimpleDownload(ctx, jsonPath)
if err != nil {
span.SetStatus(codes.Error, fmt.Sprintf("Failed to download the received share: %s", err.Error()))
log.Error().Err(err).Msg("Failed to download the received share")
return err
}
newSpaces := &Spaces{}
err = json.Unmarshal(createdBlob, newSpaces)
if err != nil {
span.SetStatus(codes.Error, fmt.Sprintf("Failed to unmarshal the received share: %s", err.Error()))
log.Error().Err(err).Msg("Failed to unmarshal the received share")
return err
}
newSpaces.Mtime = utils.TSToTime(info.Mtime)
c.ReceivedSpaces[userID] = newSpaces
}
log.Debug().Msg("Received share cache is up to date")
span.SetStatus(codes.Ok, "")
return nil
}

// Persist persists the data for one user to the storage
func (c *Cache) Persist(ctx context.Context, userID string) error {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Persist")
defer span.End()
span.SetAttributes(attribute.String("cs3.userid", userID))

if c.ReceivedSpaces[userID] == nil {
return nil
}
Expand Down
Loading