Skip to content

Commit

Permalink
enhancement: parallelization of jsoncs3 operations
Browse files Browse the repository at this point in the history
  • Loading branch information
fschade committed Jun 20, 2023
1 parent a74db9d commit a9d7b23
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 80 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Enhancement: parallelization of jsoncs3 operations

Run removeShare and share create storage operations in parallel.

https://github.com/cs3org/reva/pull/3989
192 changes: 112 additions & 80 deletions pkg/share/manager/jsoncs3/jsoncs3.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ import (
"sync"
"time"

gatewayv1beta1 "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
userv1beta1 "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
rpcv1beta1 "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
collaboration "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/go-micro/plugins/v4/events/natsjs"
"github.com/google/uuid"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
Expand All @@ -37,11 +43,6 @@ import (
"golang.org/x/sync/errgroup"
"google.golang.org/genproto/protobuf/field_mask"

gatewayv1beta1 "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
userv1beta1 "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
rpcv1beta1 "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
collaboration "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/v2/pkg/appctx"
ctxpkg "github.com/cs3org/reva/v2/pkg/ctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
Expand All @@ -57,7 +58,6 @@ import (
"github.com/cs3org/reva/v2/pkg/storage/utils/metadata" // nolint:staticcheck // we need the legacy package to convert V1 to V2 messages
"github.com/cs3org/reva/v2/pkg/storagespace"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/go-micro/plugins/v4/events/natsjs"
)

/*
Expand Down Expand Up @@ -333,83 +333,112 @@ func (m *Manager) Share(ctx context.Context, md *provider.ResourceInfo, g *colla
Mtime: ts,
}

err = m.Cache.Add(ctx, md.Id.StorageId, md.Id.SpaceId, shareID, s)
if _, ok := err.(errtypes.IsPreconditionFailed); ok {
if err := m.Cache.Sync(ctx, md.Id.StorageId, md.Id.SpaceId); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}
err = m.Cache.Add(ctx, md.Id.StorageId, md.Id.SpaceId, shareID, s)
// TODO try more often?
}
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}

err = m.CreatedCache.Add(ctx, s.GetCreator().GetOpaqueId(), shareID)
if _, ok := err.(errtypes.IsPreconditionFailed); ok {
if err := m.CreatedCache.Sync(ctx, s.GetCreator().GetOpaqueId()); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}
err = m.CreatedCache.Add(ctx, s.GetCreator().GetOpaqueId(), shareID)
// TODO try more often?
}
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}

spaceID := md.Id.StorageId + shareid.IDDelimiter + md.Id.SpaceId
// set flag for grantee to have access to share
switch g.Grantee.Type {
case provider.GranteeType_GRANTEE_TYPE_USER:
userid := g.Grantee.GetUserId().GetOpaqueId()
eg, ctx := errgroup.WithContext(ctx)

rs := &collaboration.ReceivedShare{
Share: s,
State: collaboration.ShareState_SHARE_STATE_PENDING,
}
err = m.UserReceivedStates.Add(ctx, userid, spaceID, rs)
eg.Go(func() error {
err := m.Cache.Add(ctx, md.Id.StorageId, md.Id.SpaceId, shareID, s)
if _, ok := err.(errtypes.IsPreconditionFailed); ok {
if err := m.UserReceivedStates.Sync(ctx, s.GetCreator().GetOpaqueId()); err != nil {
if err := m.Cache.Sync(ctx, md.Id.StorageId, md.Id.SpaceId); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err

return err
}
err = m.UserReceivedStates.Add(ctx, userid, spaceID, rs)

err = m.Cache.Add(ctx, md.Id.StorageId, md.Id.SpaceId, shareID, s)
// TODO try more often?
}

if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}
case provider.GranteeType_GRANTEE_TYPE_GROUP:
groupid := g.Grantee.GetGroupId().GetOpaqueId()
err := m.GroupReceivedCache.Add(ctx, groupid, shareID)

return err
})

eg.Go(func() error {
err := m.CreatedCache.Add(ctx, s.GetCreator().GetOpaqueId(), shareID)
if _, ok := err.(errtypes.IsPreconditionFailed); ok {
if err := m.GroupReceivedCache.Sync(ctx, groupid); err != nil {
if err := m.CreatedCache.Sync(ctx, s.GetCreator().GetOpaqueId()); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err

return err
}
err = m.GroupReceivedCache.Add(ctx, groupid, shareID)

err = m.CreatedCache.Add(ctx, s.GetCreator().GetOpaqueId(), shareID)
// TODO try more often?
}

if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}

return err
})

spaceID := md.Id.StorageId + shareid.IDDelimiter + md.Id.SpaceId
// set flag for grantee to have access to share
switch g.Grantee.Type {
case provider.GranteeType_GRANTEE_TYPE_USER:
eg.Go(func() error {
userid := g.Grantee.GetUserId().GetOpaqueId()

rs := &collaboration.ReceivedShare{
Share: s,
State: collaboration.ShareState_SHARE_STATE_PENDING,
}
err := m.UserReceivedStates.Add(ctx, userid, spaceID, rs)
if _, ok := err.(errtypes.IsPreconditionFailed); ok {
if err := m.UserReceivedStates.Sync(ctx, s.GetCreator().GetOpaqueId()); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}

err = m.UserReceivedStates.Add(ctx, userid, spaceID, rs)
// TODO try more often?
}

if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}

return err
})
case provider.GranteeType_GRANTEE_TYPE_GROUP:
eg.Go(func() error {
groupid := g.Grantee.GetGroupId().GetOpaqueId()
err := m.GroupReceivedCache.Add(ctx, groupid, shareID)
if _, ok := err.(errtypes.IsPreconditionFailed); ok {
if err := m.GroupReceivedCache.Sync(ctx, groupid); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}

err = m.GroupReceivedCache.Add(ctx, groupid, shareID)
// TODO try more often?
}

if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}

return err
})
}

if err = eg.Wait(); err != nil {
return nil, err
}

span.SetStatus(codes.Ok, "")

return s, nil
}

Expand Down Expand Up @@ -1087,33 +1116,36 @@ func (m *Manager) removeShare(ctx context.Context, s *collaboration.Share) error
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "removeShare")
defer span.End()

storageID, spaceID, _ := shareid.Decode(s.Id.OpaqueId)
err := m.Cache.Remove(ctx, storageID, spaceID, s.Id.OpaqueId)
if _, ok := err.(errtypes.IsPreconditionFailed); ok {
if err := m.Cache.Sync(ctx, storageID, spaceID); err != nil {
return err
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
storageID, spaceID, _ := shareid.Decode(s.Id.OpaqueId)
err := m.Cache.Remove(ctx, storageID, spaceID, s.Id.OpaqueId)
if _, ok := err.(errtypes.IsPreconditionFailed); ok {
if err := m.Cache.Sync(ctx, storageID, spaceID); err != nil {
return err
}
err = m.Cache.Remove(ctx, storageID, spaceID, s.Id.OpaqueId)
// TODO try more often?
}
err = m.Cache.Remove(ctx, storageID, spaceID, s.Id.OpaqueId)
// TODO try more often?
}
if err != nil {

return err
}
})

// remove from created cache
err = m.CreatedCache.Remove(ctx, s.GetCreator().GetOpaqueId(), s.Id.OpaqueId)
if _, ok := err.(errtypes.IsPreconditionFailed); ok {
if err := m.CreatedCache.Sync(ctx, s.GetCreator().GetOpaqueId()); err != nil {
return err
eg.Go(func() error {
// remove from created cache
err := m.CreatedCache.Remove(ctx, s.GetCreator().GetOpaqueId(), s.Id.OpaqueId)
if _, ok := err.(errtypes.IsPreconditionFailed); ok {
if err := m.CreatedCache.Sync(ctx, s.GetCreator().GetOpaqueId()); err != nil {
return err
}
err = m.CreatedCache.Remove(ctx, s.GetCreator().GetOpaqueId(), s.Id.OpaqueId)
// TODO try more often?
}
err = m.CreatedCache.Remove(ctx, s.GetCreator().GetOpaqueId(), s.Id.OpaqueId)
// TODO try more often?
}
if err != nil {

return err
}
})

// TODO remove from grantee cache

return nil
return eg.Wait()
}

0 comments on commit a9d7b23

Please sign in to comment.