Skip to content

Commit

Permalink
feat: disperse inconsecutive segments
Browse files Browse the repository at this point in the history
  • Loading branch information
MiniFrenchBread committed Feb 7, 2024
1 parent 2adc81a commit 36f7ec5
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 11 deletions.
5 changes: 3 additions & 2 deletions transfer/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,17 +313,18 @@ func (uploader *Uploader) uploadFile(data core.IterableData, tree *merkle.Tree,

offset := int64(segIndex * core.DefaultSegmentSize)

numSegments := (data.Size()-offset-1)/core.DefaultSegmentSize + 1
numTasks := int(numSegments-1)/int(taskSize) + 1
segmentUploader := &SegmentUploader{
data: data,
tree: tree,
clients: uploader.clients,
offset: offset,
disperse: disperse,
taskSize: taskSize,
numTasks: numTasks,
}

numSegments := (data.Size()-offset-1)/core.DefaultSegmentSize + 1
numTasks := int(numSegments-1)/int(taskSize) + 1
err := parallel.Serial(segmentUploader, numTasks, min(runtime.GOMAXPROCS(0), len(uploader.clients)*5), 0)
if err != nil {
return err
Expand Down
24 changes: 15 additions & 9 deletions transfer/uploader_parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type SegmentUploader struct {
offset int64
disperse bool
taskSize uint
numTasks int
}

var _ parallel.Interface = (*SegmentUploader)(nil)
Expand All @@ -27,24 +28,26 @@ func (uploader *SegmentUploader) ParallelCollect(result *parallel.Result) error

// ParallelDo implements parallel.Interface.
func (uploader *SegmentUploader) ParallelDo(routine int, task int) (interface{}, error) {
offset := uploader.offset + int64(task)*int64(uploader.taskSize)*core.DefaultSegmentSize
offset := uploader.offset + int64(task)*core.DefaultSegmentSize
numChunks := uploader.data.NumChunks()
numSegments := uploader.data.NumSegments()
segIndex := uint64(offset / core.DefaultSegmentSize)
startSegIndex := segIndex
segments := make([]node.SegmentWithProof, 0)
for i := 0; i < int(uploader.taskSize); i++ {
// get segment
segment, err := core.ReadAt(uploader.data, core.DefaultSegmentSize, offset, uploader.data.PaddedSize())
if err != nil {
return nil, err
}
// check segment index
startIndex := segIndex * core.DefaultSegmentMaxChunks
allDataUploaded := false
if startIndex >= numChunks {
// file real data already uploaded
break
} else if startIndex+uint64(len(segment))/core.DefaultChunkSize >= numChunks {
}
// get segment
segment, err := core.ReadAt(uploader.data, core.DefaultSegmentSize, offset, uploader.data.PaddedSize())
if err != nil {
return nil, err
}
if startIndex+uint64(len(segment))/core.DefaultChunkSize >= numChunks {
// last segment has real data
expectedLen := core.DefaultChunkSize * int(numChunks-startIndex)
segment = segment[:expectedLen]
Expand All @@ -63,8 +66,8 @@ func (uploader *SegmentUploader) ParallelDo(routine int, task int) (interface{},
if allDataUploaded {
break
}
segIndex++
offset += core.DefaultSegmentSize
segIndex += uint64(uploader.numTasks)
offset += core.DefaultSegmentSize * int64(uploader.numTasks)
}
// upload
if !uploader.disperse {
Expand All @@ -80,13 +83,15 @@ func (uploader *SegmentUploader) ParallelDo(routine int, task int) (interface{},
"total": numSegments,
"from_seg_index": startSegIndex,
"to_seg_index": segIndex,
"step": uploader.numTasks,
"clientIndex": clientIndex,
}).Debug("Uploading segment to node..")
if _, err := uploader.clients[clientIndex].UploadSegments(segments); err != nil && !isDuplicateError(err.Error()) {
logrus.WithFields(logrus.Fields{
"total": numSegments,
"from_seg_index": startSegIndex,
"to_seg_index": segIndex,
"step": uploader.numTasks,
"clientIndex": clientIndex,
"error": err,
}).Warn("Failed to upload segment to node, try next node..")
Expand All @@ -108,6 +113,7 @@ func (uploader *SegmentUploader) ParallelDo(routine int, task int) (interface{},
"total": numSegments,
"from_seg_index": startSegIndex,
"to_seg_index": segIndex,
"step": uploader.numTasks,
"root": core.SegmentRoot(segments[0].Data),
}).Debug("Segments uploaded")
}
Expand Down

0 comments on commit 36f7ec5

Please sign in to comment.