Skip to content

Commit

Permalink
layer: Fix multipart
Browse files Browse the repository at this point in the history
Reupload each small part, which we sliced manually before.

Signed-off-by: Evgenii Baidakov <[email protected]>
  • Loading branch information
smallhive committed Dec 10, 2024
1 parent 698e232 commit f8e99a3
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 47 deletions.
124 changes: 86 additions & 38 deletions api/layer/multipart_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/nspcc-dev/neofs-sdk-go/version"
"github.com/nspcc-dev/tzhash/tz"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

const (
Expand Down Expand Up @@ -595,14 +596,89 @@ func (n *layer) reUploadFollowingParts(ctx context.Context, uploadParams UploadP
for _, part := range parts {
uploadParams.PartNumber = part.Number

if err = n.reUploadPart(ctx, uploadParams, part.OID, bktInfo, multipartInfo); err != nil {
return fmt.Errorf("reupload number=%d: %w", part.Number, err)
if len(part.Elements) > 0 {
if err = n.reUploadSegmentedPart(ctx, uploadParams, part, bktInfo, multipartInfo); err != nil {
return fmt.Errorf("reupload number=%d: %w", part.Number, err)
}
} else {
if err = n.reUploadPart(ctx, uploadParams, part.OID, bktInfo, multipartInfo); err != nil {
return fmt.Errorf("reupload number=%d: %w", part.Number, err)
}
}
}

return nil
}

func (n *layer) reUploadSegmentedPart(ctx context.Context, uploadParams UploadPartParams, part *data.PartInfo, bktInfo *data.BucketInfo, multipartInfo *data.MultipartInfo) error {
var (
pipeReader, pipeWriter = io.Pipe()
)

eg, ctx := errgroup.WithContext(ctx)

eg.Go(func() error {
for _, element := range part.Elements {
elementObj, err := n.objectGet(ctx, bktInfo, element.OID)
if err != nil {
return fmt.Errorf("get part oid=%s, emelent oid=%s: %w", part.OID.String(), element.OID.String(), err)
}

if _, err = pipeWriter.Write(elementObj.Payload()); err != nil {
return fmt.Errorf("write part oid=%s, emelent oid=%s: %w", part.OID.String(), element.OID.String(), err)
}

// The part contains all elements for Split chain and contains itself as well.
// We mustn't remove it here, it will be removed on MultipartComplete.
if part.OID == element.OID {
continue
}

if err = n.objectDelete(ctx, bktInfo, element.OID); err != nil {
n.log.Error(
"couldn't delete object",
zap.Error(err),
zap.String("cnrID", bktInfo.CID.EncodeToString()),
zap.String("uploadID", multipartInfo.UploadID),
zap.Int("partNumber", part.Number),
zap.String("part.OID", part.OID.String()),
zap.String("part element OID", element.OID.String()),
)
// no return intentionally.
}
}

if err := pipeWriter.Close(); err != nil {
return fmt.Errorf("close writer part oid=%s: %w", part.OID.String(), err)
}

return nil
})

eg.Go(func() error {
uploadParams.Size = part.Size
uploadParams.Reader = pipeReader

n.log.Debug("reUploadPart", zap.String("oid", part.OID.String()), zap.Int64("payload size", uploadParams.Size))
if _, err := n.uploadPart(ctx, multipartInfo, &uploadParams); err != nil {
return fmt.Errorf("upload id=%s: %w", part.OID.String(), err)
}

return nil
})

if err := eg.Wait(); err != nil {
return fmt.Errorf("upload part oid=%s: %w", part.OID.String(), err)
}

// remove old object, we just re-uploaded a new one.
if err := n.objectDelete(ctx, bktInfo, part.OID); err != nil {
return fmt.Errorf("delete old id=%s: %w", part.OID.String(), err)
}

return nil
}

func (n *layer) reUploadPart(ctx context.Context, uploadParams UploadPartParams, id oid.ID, bktInfo *data.BucketInfo, multipartInfo *data.MultipartInfo) error {
obj, err := n.objectGet(ctx, bktInfo, id)
if err != nil {
Expand Down Expand Up @@ -1254,27 +1330,12 @@ func (n *layer) manualSlice(ctx context.Context, bktInfo *data.BucketInfo, prm P
// uploadPartAsSlot uploads multipart part, but without correct link to previous part because we don't have it.
// It uses zero part as pivot. Actual link will be set on CompleteMultipart.
func (n *layer) uploadPartAsSlot(ctx context.Context, params uploadPartAsSlotParams) (*data.ObjectInfo, error) {
zeroPart, err := n.treeService.GetPartByNumber(ctx, params.bktInfo, params.multipartInfo.ID, 0)
if err != nil {
return nil, fmt.Errorf("get part by number: %w", err)
}

var (
id oid.ID
chunk *[]byte
elements []data.LinkObjectPayload
isReturnToPool bool
splitFirstID = zeroPart.OID
splitPreviousID = zeroPart.OID
multipartHash = sha256.New()
currentPartHash = sha256.New()
id oid.ID
elements []data.LinkObjectPayload
multipartHash = sha256.New()
)

objHashes := []hash.Hash{multipartHash, currentPartHash}
if params.tzHash != nil {
objHashes = append(objHashes, params.tzHash)
}

params.attributes = append(params.attributes,
[2]string{headerS3MultipartUpload, params.multipartInfo.UploadID},
[2]string{headerS3MultipartNumber, strconv.FormatInt(int64(params.uploadPartParams.PartNumber), 10)},
Expand All @@ -1287,26 +1348,13 @@ func (n *layer) uploadPartAsSlot(ctx context.Context, params uploadPartAsSlotPar
Attributes: params.attributes,
CreationTime: params.creationTime,
CopiesNumber: params.multipartInfo.CopiesNumber,
Multipart: &Multipart{
MultipartHashes: objHashes,
},
}

if params.uploadPartParams.Size > n.neoFS.MaxObjectSize()/2 {
chunk = n.buffers.Get().(*[]byte)
isReturnToPool = true
} else {
smallChunk := make([]byte, params.uploadPartParams.Size)
chunk = &smallChunk
}

id, elements, err = n.manualSlice(ctx, params.bktInfo, prm, splitFirstID, splitPreviousID, *chunk, params.payloadReader)
if isReturnToPool {
n.buffers.Put(chunk)
Payload: params.payloadReader,
PayloadSize: uint64(params.decSize),
}

id, objHashBts, err := n.objectPutAndHash(ctx, prm, params.bktInfo)
if err != nil {
return nil, fmt.Errorf("manual slice: %w", err)
return nil, fmt.Errorf("object put and hash: %w", err)
}

partInfo := &data.PartInfo{
Expand All @@ -1315,7 +1363,7 @@ func (n *layer) uploadPartAsSlot(ctx context.Context, params uploadPartAsSlotPar
Number: params.uploadPartParams.PartNumber,
OID: id,
Size: params.decSize,
ETag: hex.EncodeToString(currentPartHash.Sum(nil)),
ETag: hex.EncodeToString(objHashBts),
Created: prm.CreationTime,
Elements: elements,
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ require (
github.com/urfave/cli/v2 v2.27.4
go.uber.org/zap v1.27.0
golang.org/x/crypto v0.26.0
golang.org/x/sync v0.8.0
google.golang.org/grpc v1.66.0
google.golang.org/protobuf v1.34.2
)
Expand All @@ -47,7 +48,6 @@ require (
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect
go.etcd.io/bbolt v1.3.11 // indirect
golang.org/x/exp v0.0.0-20240823005443-9b4947da3948 // indirect
golang.org/x/sync v0.8.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240827150818-7e3bb234dfed // indirect
)

Expand Down
18 changes: 10 additions & 8 deletions internal/neofs/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,15 +284,17 @@ func newPartInfo(node NodeResponse) (*data.PartInfo, error) {
case homoHashKV:
partInfo.HomoHash = []byte(value)
case elementsKV:
elements := strings.Split(value, ",")
partInfo.Elements = make([]data.LinkObjectPayload, len(elements))
for i, e := range elements {
var element data.LinkObjectPayload
if err = element.Unmarshal(e); err != nil {
return nil, fmt.Errorf("invalid element: %w", err)
if value != "" {
elements := strings.Split(value, ",")
partInfo.Elements = make([]data.LinkObjectPayload, len(elements))
for i, e := range elements {
var element data.LinkObjectPayload
if err = element.Unmarshal(e); err != nil {
return nil, fmt.Errorf("invalid element: %w", err)
}

partInfo.Elements[i] = element
}

partInfo.Elements[i] = element
}
}
}
Expand Down

0 comments on commit f8e99a3

Please sign in to comment.