Skip to content

Commit

Permalink
add trace details to jsoncs3
Browse files Browse the repository at this point in the history
Signed-off-by: Jörn Friedrich Dreyer <[email protected]>
  • Loading branch information
butonic committed Jun 8, 2023
1 parent 2fe516b commit bd91480
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 15 deletions.
3 changes: 3 additions & 0 deletions changelog/unreleased/jsoncs3-trace-span-details.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Bugfix: add trace span details

https://github.com/cs3org/reva/pull/3960
78 changes: 63 additions & 15 deletions pkg/share/manager/jsoncs3/jsoncs3.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"go.opentelemetry.io/otel/codes"
"golang.org/x/sync/errgroup"
"google.golang.org/genproto/protobuf/field_mask"

Expand Down Expand Up @@ -230,46 +231,61 @@ func New(s metadata.Storage, gc gatewayv1beta1.GatewayAPIClient, ttlSeconds int,
}, nil
}

func (m *Manager) initialize() error {
func (m *Manager) initialize(ctx context.Context) error {
_, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "initialize")
defer span.End()
if m.initialized {
span.SetStatus(codes.Ok, "already initialized")
return nil
}

m.Lock()
defer m.Unlock()

if m.initialized { // check if initialization happened while grabbing the lock
span.SetStatus(codes.Ok, "initialized while grabbing lock")
return nil
}

ctx := context.Background()
ctx = context.Background()
err := m.storage.Init(ctx, "jsoncs3-share-manager-metadata")
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}

err = m.storage.MakeDirIfNotExist(ctx, "storages")
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
err = m.storage.MakeDirIfNotExist(ctx, "users")
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
err = m.storage.MakeDirIfNotExist(ctx, "groups")
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}

m.initialized = true
span.SetStatus(codes.Ok, "initialized")
return nil
}

// Share creates a new share
func (m *Manager) Share(ctx context.Context, md *provider.ResourceInfo, g *collaboration.ShareGrant) (*collaboration.Share, error) {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Share")
defer span.End()
if err := m.initialize(); err != nil {
if err := m.initialize(ctx); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}

Expand All @@ -280,7 +296,10 @@ func (m *Manager) Share(ctx context.Context, md *provider.ResourceInfo, g *colla
// TODO: should this not already be caught at the gw level?
if g.Grantee.Type == provider.GranteeType_GRANTEE_TYPE_USER &&
(utils.UserEqual(g.Grantee.GetUserId(), user.Id) || utils.UserEqual(g.Grantee.GetUserId(), md.Owner)) {
return nil, errtypes.BadRequest("jsoncs3: owner/creator and grantee are the same")
err := errtypes.BadRequest("jsoncs3: owner/creator and grantee are the same")
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}

// check if share already exists.
Expand All @@ -295,7 +314,10 @@ func (m *Manager) Share(ctx context.Context, md *provider.ResourceInfo, g *colla
_, err := m.getByKey(ctx, key)
if err == nil {
// share already exists
return nil, errtypes.AlreadyExists(key.String())
err := errtypes.AlreadyExists(key.String())
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}

shareID := shareid.Encode(md.GetId().GetStorageId(), md.GetId().GetSpaceId(), uuid.NewString())
Expand All @@ -316,24 +338,32 @@ func (m *Manager) Share(ctx context.Context, md *provider.ResourceInfo, g *colla
err = m.Cache.Add(ctx, md.Id.StorageId, md.Id.SpaceId, shareID, s)
if _, ok := err.(errtypes.IsPreconditionFailed); ok {
if err := m.Cache.Sync(ctx, md.Id.StorageId, md.Id.SpaceId); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}
err = m.Cache.Add(ctx, md.Id.StorageId, md.Id.SpaceId, shareID, s)
// TODO try more often?
}
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}

err = m.CreatedCache.Add(ctx, s.GetCreator().GetOpaqueId(), shareID)
if _, ok := err.(errtypes.IsPreconditionFailed); ok {
if err := m.CreatedCache.Sync(ctx, s.GetCreator().GetOpaqueId()); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}
err = m.CreatedCache.Add(ctx, s.GetCreator().GetOpaqueId(), shareID)
// TODO try more often?
}
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}

Expand All @@ -350,29 +380,38 @@ func (m *Manager) Share(ctx context.Context, md *provider.ResourceInfo, g *colla
err = m.UserReceivedStates.Add(ctx, userid, spaceID, rs)
if _, ok := err.(errtypes.IsPreconditionFailed); ok {
if err := m.UserReceivedStates.Sync(ctx, s.GetCreator().GetOpaqueId()); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}
err = m.UserReceivedStates.Add(ctx, userid, spaceID, rs)
// TODO try more often?
}
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}
case provider.GranteeType_GRANTEE_TYPE_GROUP:
groupid := g.Grantee.GetGroupId().GetOpaqueId()
err := m.GroupReceivedCache.Add(ctx, groupid, shareID)
if _, ok := err.(errtypes.IsPreconditionFailed); ok {
if err := m.GroupReceivedCache.Sync(ctx, groupid); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}
err = m.GroupReceivedCache.Add(ctx, groupid, shareID)
// TODO try more often?
}
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}
}

span.SetStatus(codes.Ok, "")
return s, nil
}

Expand Down Expand Up @@ -425,7 +464,7 @@ func (m *Manager) get(ctx context.Context, ref *collaboration.ShareReference) (s
func (m *Manager) GetShare(ctx context.Context, ref *collaboration.ShareReference) (*collaboration.Share, error) {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "GetShare")
defer span.End()
if err := m.initialize(); err != nil {
if err := m.initialize(ctx); err != nil {
return nil, err
}

Expand Down Expand Up @@ -478,14 +517,14 @@ func (m *Manager) Unshare(ctx context.Context, ref *collaboration.ShareReference
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Unshare")
defer span.End()

if err := m.initialize(); err != nil {
if err := m.initialize(ctx); err != nil {
return err
}

m.Lock()
defer m.Unlock()
user := ctxpkg.ContextMustGetUser(ctx)

m.Lock()
defer m.Unlock()
s, err := m.get(ctx, ref)
if err != nil {
return err
Expand All @@ -504,7 +543,7 @@ func (m *Manager) UpdateShare(ctx context.Context, ref *collaboration.ShareRefer
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "UpdateShare")
defer span.End()

if err := m.initialize(); err != nil {
if err := m.initialize(ctx); err != nil {
return nil, err
}

Expand Down Expand Up @@ -586,7 +625,7 @@ func (m *Manager) ListShares(ctx context.Context, filters []*collaboration.Filte
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "ListShares")
defer span.End()

if err := m.initialize(); err != nil {
if err := m.initialize(ctx); err != nil {
return nil, err
}

Expand Down Expand Up @@ -622,6 +661,8 @@ func (m *Manager) listSharesByIDs(ctx context.Context, user *userv1beta1.User, f
for spaceID := range spaces {
err := m.Cache.Sync(ctx, providerID, spaceID)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}

Expand Down Expand Up @@ -669,6 +710,7 @@ func (m *Manager) listSharesByIDs(ctx context.Context, user *userv1beta1.User, f
}
}
}
span.SetStatus(codes.Ok, "")
return ss, nil
}

Expand All @@ -679,6 +721,8 @@ func (m *Manager) listCreatedShares(ctx context.Context, user *userv1beta1.User,
var ss []*collaboration.Share

if err := m.CreatedCache.Sync(ctx, user.Id.OpaqueId); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return ss, err
}
for ssid, spaceShareIDs := range m.CreatedCache.List(user.Id.OpaqueId) {
Expand Down Expand Up @@ -718,6 +762,7 @@ func (m *Manager) listCreatedShares(ctx context.Context, user *userv1beta1.User,
}
}

span.SetStatus(codes.Ok, "")
return ss, nil
}

Expand All @@ -726,7 +771,7 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "ListReceivedShares")
defer span.End()

if err := m.initialize(); err != nil {
if err := m.initialize(ctx); err != nil {
return nil, err
}

Expand Down Expand Up @@ -870,9 +915,12 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati
}

if err := g.Wait(); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}

span.SetStatus(codes.Ok, "")
return rss, nil
}

Expand All @@ -899,7 +947,7 @@ func (m *Manager) convert(ctx context.Context, userID string, s *collaboration.S

// GetReceivedShare returns the information for a received share.
func (m *Manager) GetReceivedShare(ctx context.Context, ref *collaboration.ShareReference) (*collaboration.ReceivedShare, error) {
if err := m.initialize(); err != nil {
if err := m.initialize(ctx); err != nil {
return nil, err
}

Expand Down Expand Up @@ -944,7 +992,7 @@ func (m *Manager) UpdateReceivedShare(ctx context.Context, receivedShare *collab
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "UpdateReceivedShare")
defer span.End()

if err := m.initialize(); err != nil {
if err := m.initialize(ctx); err != nil {
return nil, err
}

Expand Down Expand Up @@ -997,7 +1045,7 @@ func updateShareID(share *collaboration.Share) {
// Load imports shares and received shares from channels (e.g. during migration)
func (m *Manager) Load(ctx context.Context, shareChan <-chan *collaboration.Share, receivedShareChan <-chan share.ReceivedShareWithUser) error {
log := appctx.GetLogger(ctx)
if err := m.initialize(); err != nil {
if err := m.initialize(ctx); err != nil {
return err
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/share/manager/jsoncs3/providercache/providercache.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ func (c *Cache) PersistWithTime(ctx context.Context, storageID, spaceID string,
span.SetAttributes(attribute.String("cs3.storageid", storageID), attribute.String("cs3.spaceid", spaceID))

if c.Providers[storageID] == nil || c.Providers[storageID].Spaces[spaceID] == nil {
span.SetStatus(codes.Ok, "no shares in provider or space")
return nil
}

Expand All @@ -178,11 +179,15 @@ func (c *Cache) PersistWithTime(ctx context.Context, storageID, spaceID string,
createdBytes, err := json.Marshal(c.Providers[storageID].Spaces[spaceID])
if err != nil {
c.Providers[storageID].Spaces[spaceID].Mtime = oldMtime
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
jsonPath := spaceJSONPath(storageID, spaceID)
if err := c.storage.MakeDirIfNotExist(ctx, path.Dir(jsonPath)); err != nil {
c.Providers[storageID].Spaces[spaceID].Mtime = oldMtime
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}

Expand All @@ -192,8 +197,11 @@ func (c *Cache) PersistWithTime(ctx context.Context, storageID, spaceID string,
IfUnmodifiedSince: c.Providers[storageID].Spaces[spaceID].Mtime,
}); err != nil {
c.Providers[storageID].Spaces[spaceID].Mtime = oldMtime
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
span.SetStatus(codes.Ok, "")
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ func (c *Cache) Persist(ctx context.Context, userID string) error {
span.SetAttributes(attribute.String("cs3.userid", userID))

if c.ReceivedSpaces[userID] == nil {
span.SetStatus(codes.Ok, "no received shares")
return nil
}

Expand All @@ -188,11 +189,15 @@ func (c *Cache) Persist(ctx context.Context, userID string) error {
createdBytes, err := json.Marshal(c.ReceivedSpaces[userID])
if err != nil {
c.ReceivedSpaces[userID].Mtime = oldMtime
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
jsonPath := userJSONPath(userID)
if err := c.storage.MakeDirIfNotExist(ctx, path.Dir(jsonPath)); err != nil {
c.ReceivedSpaces[userID].Mtime = oldMtime
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}

Expand All @@ -202,8 +207,11 @@ func (c *Cache) Persist(ctx context.Context, userID string) error {
IfUnmodifiedSince: c.ReceivedSpaces[userID].Mtime,
}); err != nil {
c.ReceivedSpaces[userID].Mtime = oldMtime
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
span.SetStatus(codes.Ok, "")
return nil
}

Expand Down
Loading

0 comments on commit bd91480

Please sign in to comment.