diff --git a/changelog/2.14.0_2023-06-05/jsoncs3-operation-parallelization.md b/changelog/2.14.0_2023-06-05/jsoncs3-operation-parallelization.md new file mode 100644 index 00000000000..e273c00db1e --- /dev/null +++ b/changelog/2.14.0_2023-06-05/jsoncs3-operation-parallelization.md @@ -0,0 +1,5 @@ +Enhancement: parallelization of jsoncs3 operations + +Run removeShare and share create storage operations in parallel. + +https://github.com/cs3org/reva/pull/3989 diff --git a/pkg/share/manager/jsoncs3/jsoncs3.go b/pkg/share/manager/jsoncs3/jsoncs3.go index c5587e76199..7623daa67af 100644 --- a/pkg/share/manager/jsoncs3/jsoncs3.go +++ b/pkg/share/manager/jsoncs3/jsoncs3.go @@ -42,6 +42,8 @@ import ( rpcv1beta1 "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" collaboration "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + "github.com/go-micro/plugins/v4/events/natsjs" + "github.com/cs3org/reva/v2/pkg/appctx" ctxpkg "github.com/cs3org/reva/v2/pkg/ctx" "github.com/cs3org/reva/v2/pkg/errtypes" @@ -57,7 +59,6 @@ import ( "github.com/cs3org/reva/v2/pkg/storage/utils/metadata" // nolint:staticcheck // we need the legacy package to convert V1 to V2 messages "github.com/cs3org/reva/v2/pkg/storagespace" "github.com/cs3org/reva/v2/pkg/utils" - "github.com/go-micro/plugins/v4/events/natsjs" ) /* @@ -333,83 +334,112 @@ func (m *Manager) Share(ctx context.Context, md *provider.ResourceInfo, g *colla Mtime: ts, } - err = m.Cache.Add(ctx, md.Id.StorageId, md.Id.SpaceId, shareID, s) - if _, ok := err.(errtypes.IsPreconditionFailed); ok { - if err := m.Cache.Sync(ctx, md.Id.StorageId, md.Id.SpaceId); err != nil { - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - return nil, err - } - err = m.Cache.Add(ctx, md.Id.StorageId, md.Id.SpaceId, shareID, s) - // TODO try more often? - } - if err != nil { - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - return nil, err - } - - err = m.CreatedCache.Add(ctx, s.GetCreator().GetOpaqueId(), shareID) - if _, ok := err.(errtypes.IsPreconditionFailed); ok { - if err := m.CreatedCache.Sync(ctx, s.GetCreator().GetOpaqueId()); err != nil { - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - return nil, err - } - err = m.CreatedCache.Add(ctx, s.GetCreator().GetOpaqueId(), shareID) - // TODO try more often? - } - if err != nil { - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - return nil, err - } + eg, ctx := errgroup.WithContext(ctx) - spaceID := md.Id.StorageId + shareid.IDDelimiter + md.Id.SpaceId - // set flag for grantee to have access to share - switch g.Grantee.Type { - case provider.GranteeType_GRANTEE_TYPE_USER: - userid := g.Grantee.GetUserId().GetOpaqueId() - - rs := &collaboration.ReceivedShare{ - Share: s, - State: collaboration.ShareState_SHARE_STATE_PENDING, - } - err = m.UserReceivedStates.Add(ctx, userid, spaceID, rs) + eg.Go(func() error { + err := m.Cache.Add(ctx, md.Id.StorageId, md.Id.SpaceId, shareID, s) if _, ok := err.(errtypes.IsPreconditionFailed); ok { - if err := m.UserReceivedStates.Sync(ctx, s.GetCreator().GetOpaqueId()); err != nil { + if err := m.Cache.Sync(ctx, md.Id.StorageId, md.Id.SpaceId); err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) - return nil, err + + return err } - err = m.UserReceivedStates.Add(ctx, userid, spaceID, rs) + + err = m.Cache.Add(ctx, md.Id.StorageId, md.Id.SpaceId, shareID, s) // TODO try more often? } + if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) - return nil, err } - case provider.GranteeType_GRANTEE_TYPE_GROUP: - groupid := g.Grantee.GetGroupId().GetOpaqueId() - err := m.GroupReceivedCache.Add(ctx, groupid, shareID) + + return err + }) + + eg.Go(func() error { + err := m.CreatedCache.Add(ctx, s.GetCreator().GetOpaqueId(), shareID) if _, ok := err.(errtypes.IsPreconditionFailed); ok { - if err := m.GroupReceivedCache.Sync(ctx, groupid); err != nil { + if err := m.CreatedCache.Sync(ctx, s.GetCreator().GetOpaqueId()); err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) - return nil, err + + return err } - err = m.GroupReceivedCache.Add(ctx, groupid, shareID) + + err = m.CreatedCache.Add(ctx, s.GetCreator().GetOpaqueId(), shareID) // TODO try more often? } + if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) - return nil, err } + + return err + }) + + spaceID := md.Id.StorageId + shareid.IDDelimiter + md.Id.SpaceId + // set flag for grantee to have access to share + switch g.Grantee.Type { + case provider.GranteeType_GRANTEE_TYPE_USER: + eg.Go(func() error { + userid := g.Grantee.GetUserId().GetOpaqueId() + + rs := &collaboration.ReceivedShare{ + Share: s, + State: collaboration.ShareState_SHARE_STATE_PENDING, + } + err := m.UserReceivedStates.Add(ctx, userid, spaceID, rs) + if _, ok := err.(errtypes.IsPreconditionFailed); ok { + if err := m.UserReceivedStates.Sync(ctx, s.GetCreator().GetOpaqueId()); err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return err + } + + err = m.UserReceivedStates.Add(ctx, userid, spaceID, rs) + // TODO try more often? + } + + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + + return err + }) + case provider.GranteeType_GRANTEE_TYPE_GROUP: + eg.Go(func() error { + groupid := g.Grantee.GetGroupId().GetOpaqueId() + err := m.GroupReceivedCache.Add(ctx, groupid, shareID) + if _, ok := err.(errtypes.IsPreconditionFailed); ok { + if err := m.GroupReceivedCache.Sync(ctx, groupid); err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return err + } + + err = m.GroupReceivedCache.Add(ctx, groupid, shareID) + // TODO try more often? + } + + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + + return err + }) + } + + if err = eg.Wait(); err != nil { + return nil, err } span.SetStatus(codes.Ok, "") + return s, nil } @@ -1087,33 +1117,36 @@ func (m *Manager) removeShare(ctx context.Context, s *collaboration.Share) error ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "removeShare") defer span.End() - storageID, spaceID, _ := shareid.Decode(s.Id.OpaqueId) - err := m.Cache.Remove(ctx, storageID, spaceID, s.Id.OpaqueId) - if _, ok := err.(errtypes.IsPreconditionFailed); ok { - if err := m.Cache.Sync(ctx, storageID, spaceID); err != nil { - return err + eg, ctx := errgroup.WithContext(ctx) + eg.Go(func() error { + storageID, spaceID, _ := shareid.Decode(s.Id.OpaqueId) + err := m.Cache.Remove(ctx, storageID, spaceID, s.Id.OpaqueId) + if _, ok := err.(errtypes.IsPreconditionFailed); ok { + if err := m.Cache.Sync(ctx, storageID, spaceID); err != nil { + return err + } + err = m.Cache.Remove(ctx, storageID, spaceID, s.Id.OpaqueId) + // TODO try more often? } - err = m.Cache.Remove(ctx, storageID, spaceID, s.Id.OpaqueId) - // TODO try more often? - } - if err != nil { + return err - } + }) - // remove from created cache - err = m.CreatedCache.Remove(ctx, s.GetCreator().GetOpaqueId(), s.Id.OpaqueId) - if _, ok := err.(errtypes.IsPreconditionFailed); ok { - if err := m.CreatedCache.Sync(ctx, s.GetCreator().GetOpaqueId()); err != nil { - return err + eg.Go(func() error { + // remove from created cache + err := m.CreatedCache.Remove(ctx, s.GetCreator().GetOpaqueId(), s.Id.OpaqueId) + if _, ok := err.(errtypes.IsPreconditionFailed); ok { + if err := m.CreatedCache.Sync(ctx, s.GetCreator().GetOpaqueId()); err != nil { + return err + } + err = m.CreatedCache.Remove(ctx, s.GetCreator().GetOpaqueId(), s.Id.OpaqueId) + // TODO try more often? } - err = m.CreatedCache.Remove(ctx, s.GetCreator().GetOpaqueId(), s.Id.OpaqueId) - // TODO try more often? - } - if err != nil { + return err - } + }) // TODO remove from grantee cache - return nil + return eg.Wait() }