From 36f7ec5ad54e16eb5ebb253fb0fc098df825748d Mon Sep 17 00:00:00 2001 From: MiniFrenchBread <103425574+MiniFrenchBread@users.noreply.github.com> Date: Wed, 7 Feb 2024 18:41:22 +0800 Subject: [PATCH] feat: disperse inconsecutive segments --- transfer/uploader.go | 5 +++-- transfer/uploader_parallel.go | 24 +++++++++++++++--------- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/transfer/uploader.go b/transfer/uploader.go index bca1169..5d2c405 100644 --- a/transfer/uploader.go +++ b/transfer/uploader.go @@ -313,6 +313,8 @@ func (uploader *Uploader) uploadFile(data core.IterableData, tree *merkle.Tree, offset := int64(segIndex * core.DefaultSegmentSize) + numSegments := (data.Size()-offset-1)/core.DefaultSegmentSize + 1 + numTasks := int(numSegments-1)/int(taskSize) + 1 segmentUploader := &SegmentUploader{ data: data, tree: tree, @@ -320,10 +322,9 @@ func (uploader *Uploader) uploadFile(data core.IterableData, tree *merkle.Tree, offset: offset, disperse: disperse, taskSize: taskSize, + numTasks: numTasks, } - numSegments := (data.Size()-offset-1)/core.DefaultSegmentSize + 1 - numTasks := int(numSegments-1)/int(taskSize) + 1 err := parallel.Serial(segmentUploader, numTasks, min(runtime.GOMAXPROCS(0), len(uploader.clients)*5), 0) if err != nil { return err diff --git a/transfer/uploader_parallel.go b/transfer/uploader_parallel.go index 3033ed3..910b246 100644 --- a/transfer/uploader_parallel.go +++ b/transfer/uploader_parallel.go @@ -16,6 +16,7 @@ type SegmentUploader struct { offset int64 disperse bool taskSize uint + numTasks int } var _ parallel.Interface = (*SegmentUploader)(nil) @@ -27,24 +28,26 @@ func (uploader *SegmentUploader) ParallelCollect(result *parallel.Result) error // ParallelDo implements parallel.Interface. func (uploader *SegmentUploader) ParallelDo(routine int, task int) (interface{}, error) { - offset := uploader.offset + int64(task)*int64(uploader.taskSize)*core.DefaultSegmentSize + offset := uploader.offset + int64(task)*core.DefaultSegmentSize numChunks := uploader.data.NumChunks() numSegments := uploader.data.NumSegments() segIndex := uint64(offset / core.DefaultSegmentSize) startSegIndex := segIndex segments := make([]node.SegmentWithProof, 0) for i := 0; i < int(uploader.taskSize); i++ { - // get segment - segment, err := core.ReadAt(uploader.data, core.DefaultSegmentSize, offset, uploader.data.PaddedSize()) - if err != nil { - return nil, err - } + // check segment index startIndex := segIndex * core.DefaultSegmentMaxChunks allDataUploaded := false if startIndex >= numChunks { // file real data already uploaded break - } else if startIndex+uint64(len(segment))/core.DefaultChunkSize >= numChunks { + } + // get segment + segment, err := core.ReadAt(uploader.data, core.DefaultSegmentSize, offset, uploader.data.PaddedSize()) + if err != nil { + return nil, err + } + if startIndex+uint64(len(segment))/core.DefaultChunkSize >= numChunks { // last segment has real data expectedLen := core.DefaultChunkSize * int(numChunks-startIndex) segment = segment[:expectedLen] @@ -63,8 +66,8 @@ func (uploader *SegmentUploader) ParallelDo(routine int, task int) (interface{}, if allDataUploaded { break } - segIndex++ - offset += core.DefaultSegmentSize + segIndex += uint64(uploader.numTasks) + offset += core.DefaultSegmentSize * int64(uploader.numTasks) } // upload if !uploader.disperse { @@ -80,6 +83,7 @@ func (uploader *SegmentUploader) ParallelDo(routine int, task int) (interface{}, "total": numSegments, "from_seg_index": startSegIndex, "to_seg_index": segIndex, + "step": uploader.numTasks, "clientIndex": clientIndex, }).Debug("Uploading segment to node..") if _, err := uploader.clients[clientIndex].UploadSegments(segments); err != nil && !isDuplicateError(err.Error()) { @@ -87,6 +91,7 @@ func (uploader *SegmentUploader) ParallelDo(routine int, task int) (interface{}, "total": numSegments, "from_seg_index": startSegIndex, "to_seg_index": segIndex, + "step": uploader.numTasks, "clientIndex": clientIndex, "error": err, }).Warn("Failed to upload segment to node, try next node..") @@ -108,6 +113,7 @@ func (uploader *SegmentUploader) ParallelDo(routine int, task int) (interface{}, "total": numSegments, "from_seg_index": startSegIndex, "to_seg_index": segIndex, + "step": uploader.numTasks, "root": core.SegmentRoot(segments[0].Data), }).Debug("Segments uploaded") }