diff --git a/cmd/download.go b/cmd/download.go index 6ac7ef6..018f23a 100644 --- a/cmd/download.go +++ b/cmd/download.go @@ -37,7 +37,10 @@ func init() { func download(*cobra.Command, []string) { nodes := node.MustNewClients(downloadArgs.nodes) - downloader := transfer.NewDownloader(nodes...) + downloader, err := transfer.NewDownloader(nodes...) + if err != nil { + logrus.WithError(err).Fatal("Failed to initialize downloader") + } if err := downloader.Download(downloadArgs.root, downloadArgs.file, downloadArgs.proof); err != nil { logrus.WithError(err).Fatal("Failed to download file") diff --git a/gateway/local_apis.go b/gateway/local_apis.go index b126600..e0e5a78 100644 --- a/gateway/local_apis.go +++ b/gateway/local_apis.go @@ -150,7 +150,10 @@ func downloadFileLocal(c *gin.Context) (interface{}, error) { return nil, ErrValidation.WithData("node index out of bound") } - downloader := transfer.NewDownloader(allClients[input.Node]) + downloader, err := transfer.NewDownloader(allClients[input.Node]) + if err != nil { + return nil, err + } filename := getFilePath(input.Path, true) diff --git a/transfer/download_parallel.go b/transfer/download_parallel.go index 01735c1..ba15d4b 100644 --- a/transfer/download_parallel.go +++ b/transfer/download_parallel.go @@ -2,6 +2,7 @@ package transfer import ( "fmt" + "runtime" "github.com/0glabs/0g-storage-client/common/parallel" "github.com/0glabs/0g-storage-client/core" @@ -12,11 +13,10 @@ import ( "github.com/sirupsen/logrus" ) -const minBufSize = 8 - type SegmentDownloader struct { - clients []*node.Client - file *download.DownloadingFile + clients []*node.Client + shardConfigs []*node.ShardConfig + file *download.DownloadingFile withProof bool @@ -27,7 +27,7 @@ type SegmentDownloader struct { var _ parallel.Interface = (*SegmentDownloader)(nil) -func NewSegmentDownloader(clients []*node.Client, file *download.DownloadingFile, withProof bool) (*SegmentDownloader, error) { +func NewSegmentDownloader(clients []*node.Client, shardConfigs []*node.ShardConfig, file *download.DownloadingFile, withProof bool) (*SegmentDownloader, error) { offset := file.Metadata().Offset if offset%core.DefaultSegmentSize > 0 { return nil, errors.Errorf("Invalid data offset in downloading file %v", offset) @@ -36,8 +36,9 @@ func NewSegmentDownloader(clients []*node.Client, file *download.DownloadingFile fileSize := file.Metadata().Size return &SegmentDownloader{ - clients: clients, - file: file, + clients: clients, + shardConfigs: shardConfigs, + file: file, withProof: withProof, @@ -50,13 +51,8 @@ func NewSegmentDownloader(clients []*node.Client, file *download.DownloadingFile // Download downloads segments in parallel. func (downloader *SegmentDownloader) Download() error { numTasks := downloader.numSegments - downloader.segmentOffset - numNodes := len(downloader.clients) - bufSize := numNodes * 2 - if bufSize < minBufSize { - bufSize = minBufSize - } - return parallel.Serial(downloader, int(numTasks), numNodes, bufSize) + return parallel.Serial(downloader, int(numTasks), runtime.GOMAXPROCS(0), 0) } // ParallelDo implements the parallel.Interface interface. @@ -70,28 +66,36 @@ func (downloader *SegmentDownloader) ParallelDo(routine, task int) (interface{}, root := downloader.file.Metadata().Root + clientIndex := routine % len(downloader.shardConfigs) + for segmentIndex%downloader.shardConfigs[clientIndex].NumShard != downloader.shardConfigs[clientIndex].ShardId { + clientIndex = (clientIndex + 1) % len(downloader.shardConfigs) + if clientIndex == routine%len(downloader.shardConfigs) { + return nil, fmt.Errorf("no storage node holds segment with index %v", segmentIndex) + } + } + var ( segment []byte err error ) if downloader.withProof { - segment, err = downloader.downloadWithProof(downloader.clients[routine], root, startIndex, endIndex) + segment, err = downloader.downloadWithProof(downloader.clients[clientIndex], root, startIndex, endIndex) } else { - segment, err = downloader.clients[routine].ZeroGStorage().DownloadSegment(root, startIndex, endIndex) + segment, err = downloader.clients[clientIndex].ZeroGStorage().DownloadSegment(root, startIndex, endIndex) } if err != nil { logrus.WithError(err).WithFields(logrus.Fields{ - "routine": routine, - "segment": fmt.Sprintf("%v/%v", segmentIndex, downloader.numSegments), - "chunks": fmt.Sprintf("[%v, %v)", startIndex, endIndex), + "client index": clientIndex, + "segment": fmt.Sprintf("%v/%v", segmentIndex, downloader.numSegments), + "chunks": fmt.Sprintf("[%v, %v)", startIndex, endIndex), }).Error("Failed to download segment") } else if logrus.IsLevelEnabled(logrus.TraceLevel) { logrus.WithFields(logrus.Fields{ - "routine": routine, - "segment": fmt.Sprintf("%v/%v", segmentIndex, downloader.numSegments), - "chunks": fmt.Sprintf("[%v, %v)", startIndex, endIndex), + "client index": clientIndex, + "segment": fmt.Sprintf("%v/%v", segmentIndex, downloader.numSegments), + "chunks": fmt.Sprintf("[%v, %v)", startIndex, endIndex), }).Trace("Succeeded to download segment") } diff --git a/transfer/downloader.go b/transfer/downloader.go index 3e5c264..ec24285 100644 --- a/transfer/downloader.go +++ b/transfer/downloader.go @@ -13,20 +13,27 @@ import ( ) type Downloader struct { - clients []*node.Client + clients []*node.Client + shardConfigs []*node.ShardConfig } -func NewDownloader(clients ...*node.Client) *Downloader { +func NewDownloader(clients ...*node.Client) (*Downloader, error) { if len(clients) == 0 { panic("storage node not specified") } - return &Downloader{ - clients: clients, + shardConfigs, err := getShardConfigs(clients) + if err != nil { + return nil, err } + + return &Downloader{ + clients: clients, + shardConfigs: shardConfigs, + }, nil } -func (downloader *Downloader) Download(root, filename string, proof bool) error { +func (downloader *Downloader) Download(root, filename string, withProof bool) error { hash := common.HexToHash(root) // Query file info from storage node @@ -41,7 +48,7 @@ func (downloader *Downloader) Download(root, filename string, proof bool) error } // Download segments - if err = downloader.downloadFile(filename, hash, int64(info.Tx.Size), proof); err != nil { + if err = downloader.downloadFile(filename, hash, int64(info.Tx.Size), withProof); err != nil { return errors.WithMessage(err, "Failed to download file") } @@ -54,7 +61,7 @@ func (downloader *Downloader) Download(root, filename string, proof bool) error } func (downloader *Downloader) queryFile(root common.Hash) (info *node.FileInfo, err error) { - // requires file finalized on all storage nodes + // do not require file finalized for _, v := range downloader.clients { info, err = v.ZeroGStorage().GetFileInfo(root) if err != nil { @@ -64,10 +71,6 @@ func (downloader *Downloader) queryFile(root common.Hash) (info *node.FileInfo, if info == nil { return nil, fmt.Errorf("file not found on node %v", v.URL()) } - - if !info.Finalized { - return nil, fmt.Errorf("file not finalized on node %v", v.URL()) - } } logrus.WithField("file", info).Debug("File found by root hash") @@ -99,16 +102,16 @@ func (downloader *Downloader) checkExistence(filename string, hash common.Hash) return errors.New("File already exists with different hash") } -func (downloader *Downloader) downloadFile(filename string, root common.Hash, size int64, proof bool) error { +func (downloader *Downloader) downloadFile(filename string, root common.Hash, size int64, withProof bool) error { file, err := download.CreateDownloadingFile(filename, root, size) if err != nil { return errors.WithMessage(err, "Failed to create downloading file") } defer file.Close() - logrus.WithField("threads", len(downloader.clients)).Info("Begin to download file from storage node") + logrus.WithField("clients", len(downloader.clients)).Info("Begin to download file from storage node") - sd, err := NewSegmentDownloader(downloader.clients, file, proof) + sd, err := NewSegmentDownloader(downloader.clients, downloader.shardConfigs, file, withProof) if err != nil { return errors.WithMessage(err, "Failed to create segment downloader") } diff --git a/transfer/uploader.go b/transfer/uploader.go index d7048ee..b310607 100644 --- a/transfer/uploader.go +++ b/transfer/uploader.go @@ -41,6 +41,21 @@ type Uploader struct { shardConfigs []*node.ShardConfig } +func getShardConfigs(clients []*node.Client) ([]*node.ShardConfig, error) { + shardConfigs := make([]*node.ShardConfig, 0) + for _, client := range clients { + shardConfig, err := client.ZeroGStorage().GetShardConfig() + if err != nil { + return nil, err + } + if shardConfig.NumShard == 0 { + return nil, errors.New("NumShard is zero") + } + shardConfigs = append(shardConfigs, shardConfig) + } + return shardConfigs, nil +} + func NewUploader(flow *contract.FlowContract, clients []*node.Client) (*Uploader, error) { uploader, err := NewUploaderLight(clients) if err != nil { @@ -55,17 +70,12 @@ func NewUploaderLight(clients []*node.Client) (*Uploader, error) { panic("storage node not specified") } zgClients := make([]*node.ZeroGStorageClient, 0) - shardConfigs := make([]*node.ShardConfig, 0) for _, client := range clients { zgClients = append(zgClients, client.ZeroGStorage()) - shardConfig, err := client.ZeroGStorage().GetShardConfig() - if err != nil { - return nil, err - } - if shardConfig.NumShard == 0 { - return nil, errors.New("NumShard is zero") - } - shardConfigs = append(shardConfigs, shardConfig) + } + shardConfigs, err := getShardConfigs(clients) + if err != nil { + return nil, err } return &Uploader{ clients: zgClients,