Skip to content

Commit

Permalink
feat: download by shard (#11)
Browse files Browse the repository at this point in the history
* feat: download file by shard config

* fix: log
  • Loading branch information
MiniFrenchBread authored Jun 14, 2024
1 parent 98d74b7 commit 977d3be
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 46 deletions.
5 changes: 4 additions & 1 deletion cmd/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ func init() {
func download(*cobra.Command, []string) {
nodes := node.MustNewClients(downloadArgs.nodes)

downloader := transfer.NewDownloader(nodes...)
downloader, err := transfer.NewDownloader(nodes...)
if err != nil {
logrus.WithError(err).Fatal("Failed to initialize downloader")
}

if err := downloader.Download(downloadArgs.root, downloadArgs.file, downloadArgs.proof); err != nil {
logrus.WithError(err).Fatal("Failed to download file")
Expand Down
5 changes: 4 additions & 1 deletion gateway/local_apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,10 @@ func downloadFileLocal(c *gin.Context) (interface{}, error) {
return nil, ErrValidation.WithData("node index out of bound")
}

downloader := transfer.NewDownloader(allClients[input.Node])
downloader, err := transfer.NewDownloader(allClients[input.Node])
if err != nil {
return nil, err
}

filename := getFilePath(input.Path, true)

Expand Down
46 changes: 25 additions & 21 deletions transfer/download_parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package transfer

import (
"fmt"
"runtime"

"github.com/0glabs/0g-storage-client/common/parallel"
"github.com/0glabs/0g-storage-client/core"
Expand All @@ -12,11 +13,10 @@ import (
"github.com/sirupsen/logrus"
)

const minBufSize = 8

type SegmentDownloader struct {
clients []*node.Client
file *download.DownloadingFile
clients []*node.Client
shardConfigs []*node.ShardConfig
file *download.DownloadingFile

withProof bool

Expand All @@ -27,7 +27,7 @@ type SegmentDownloader struct {

var _ parallel.Interface = (*SegmentDownloader)(nil)

func NewSegmentDownloader(clients []*node.Client, file *download.DownloadingFile, withProof bool) (*SegmentDownloader, error) {
func NewSegmentDownloader(clients []*node.Client, shardConfigs []*node.ShardConfig, file *download.DownloadingFile, withProof bool) (*SegmentDownloader, error) {
offset := file.Metadata().Offset
if offset%core.DefaultSegmentSize > 0 {
return nil, errors.Errorf("Invalid data offset in downloading file %v", offset)
Expand All @@ -36,8 +36,9 @@ func NewSegmentDownloader(clients []*node.Client, file *download.DownloadingFile
fileSize := file.Metadata().Size

return &SegmentDownloader{
clients: clients,
file: file,
clients: clients,
shardConfigs: shardConfigs,
file: file,

withProof: withProof,

Expand All @@ -50,13 +51,8 @@ func NewSegmentDownloader(clients []*node.Client, file *download.DownloadingFile
// Download downloads segments in parallel.
func (downloader *SegmentDownloader) Download() error {
numTasks := downloader.numSegments - downloader.segmentOffset
numNodes := len(downloader.clients)
bufSize := numNodes * 2
if bufSize < minBufSize {
bufSize = minBufSize
}

return parallel.Serial(downloader, int(numTasks), numNodes, bufSize)
return parallel.Serial(downloader, int(numTasks), runtime.GOMAXPROCS(0), 0)
}

// ParallelDo implements the parallel.Interface interface.
Expand All @@ -70,28 +66,36 @@ func (downloader *SegmentDownloader) ParallelDo(routine, task int) (interface{},

root := downloader.file.Metadata().Root

clientIndex := routine % len(downloader.shardConfigs)
for segmentIndex%downloader.shardConfigs[clientIndex].NumShard != downloader.shardConfigs[clientIndex].ShardId {
clientIndex = (clientIndex + 1) % len(downloader.shardConfigs)
if clientIndex == routine%len(downloader.shardConfigs) {
return nil, fmt.Errorf("no storage node holds segment with index %v", segmentIndex)
}
}

var (
segment []byte
err error
)

if downloader.withProof {
segment, err = downloader.downloadWithProof(downloader.clients[routine], root, startIndex, endIndex)
segment, err = downloader.downloadWithProof(downloader.clients[clientIndex], root, startIndex, endIndex)
} else {
segment, err = downloader.clients[routine].ZeroGStorage().DownloadSegment(root, startIndex, endIndex)
segment, err = downloader.clients[clientIndex].ZeroGStorage().DownloadSegment(root, startIndex, endIndex)
}

if err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"routine": routine,
"segment": fmt.Sprintf("%v/%v", segmentIndex, downloader.numSegments),
"chunks": fmt.Sprintf("[%v, %v)", startIndex, endIndex),
"client index": clientIndex,
"segment": fmt.Sprintf("%v/%v", segmentIndex, downloader.numSegments),
"chunks": fmt.Sprintf("[%v, %v)", startIndex, endIndex),
}).Error("Failed to download segment")
} else if logrus.IsLevelEnabled(logrus.TraceLevel) {
logrus.WithFields(logrus.Fields{
"routine": routine,
"segment": fmt.Sprintf("%v/%v", segmentIndex, downloader.numSegments),
"chunks": fmt.Sprintf("[%v, %v)", startIndex, endIndex),
"client index": clientIndex,
"segment": fmt.Sprintf("%v/%v", segmentIndex, downloader.numSegments),
"chunks": fmt.Sprintf("[%v, %v)", startIndex, endIndex),
}).Trace("Succeeded to download segment")
}

Expand Down
31 changes: 17 additions & 14 deletions transfer/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,27 @@ import (
)

type Downloader struct {
clients []*node.Client
clients []*node.Client
shardConfigs []*node.ShardConfig
}

func NewDownloader(clients ...*node.Client) *Downloader {
func NewDownloader(clients ...*node.Client) (*Downloader, error) {
if len(clients) == 0 {
panic("storage node not specified")
}

return &Downloader{
clients: clients,
shardConfigs, err := getShardConfigs(clients)
if err != nil {
return nil, err
}

return &Downloader{
clients: clients,
shardConfigs: shardConfigs,
}, nil
}

func (downloader *Downloader) Download(root, filename string, proof bool) error {
func (downloader *Downloader) Download(root, filename string, withProof bool) error {
hash := common.HexToHash(root)

// Query file info from storage node
Expand All @@ -41,7 +48,7 @@ func (downloader *Downloader) Download(root, filename string, proof bool) error
}

// Download segments
if err = downloader.downloadFile(filename, hash, int64(info.Tx.Size), proof); err != nil {
if err = downloader.downloadFile(filename, hash, int64(info.Tx.Size), withProof); err != nil {
return errors.WithMessage(err, "Failed to download file")
}

Expand All @@ -54,7 +61,7 @@ func (downloader *Downloader) Download(root, filename string, proof bool) error
}

func (downloader *Downloader) queryFile(root common.Hash) (info *node.FileInfo, err error) {
// requires file finalized on all storage nodes
// do not require file finalized
for _, v := range downloader.clients {
info, err = v.ZeroGStorage().GetFileInfo(root)
if err != nil {
Expand All @@ -64,10 +71,6 @@ func (downloader *Downloader) queryFile(root common.Hash) (info *node.FileInfo,
if info == nil {
return nil, fmt.Errorf("file not found on node %v", v.URL())
}

if !info.Finalized {
return nil, fmt.Errorf("file not finalized on node %v", v.URL())
}
}

logrus.WithField("file", info).Debug("File found by root hash")
Expand Down Expand Up @@ -99,16 +102,16 @@ func (downloader *Downloader) checkExistence(filename string, hash common.Hash)
return errors.New("File already exists with different hash")
}

func (downloader *Downloader) downloadFile(filename string, root common.Hash, size int64, proof bool) error {
func (downloader *Downloader) downloadFile(filename string, root common.Hash, size int64, withProof bool) error {
file, err := download.CreateDownloadingFile(filename, root, size)
if err != nil {
return errors.WithMessage(err, "Failed to create downloading file")
}
defer file.Close()

logrus.WithField("threads", len(downloader.clients)).Info("Begin to download file from storage node")
logrus.WithField("clients", len(downloader.clients)).Info("Begin to download file from storage node")

sd, err := NewSegmentDownloader(downloader.clients, file, proof)
sd, err := NewSegmentDownloader(downloader.clients, downloader.shardConfigs, file, withProof)
if err != nil {
return errors.WithMessage(err, "Failed to create segment downloader")
}
Expand Down
28 changes: 19 additions & 9 deletions transfer/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,21 @@ type Uploader struct {
shardConfigs []*node.ShardConfig
}

func getShardConfigs(clients []*node.Client) ([]*node.ShardConfig, error) {
shardConfigs := make([]*node.ShardConfig, 0)
for _, client := range clients {
shardConfig, err := client.ZeroGStorage().GetShardConfig()
if err != nil {
return nil, err
}
if shardConfig.NumShard == 0 {
return nil, errors.New("NumShard is zero")
}
shardConfigs = append(shardConfigs, shardConfig)
}
return shardConfigs, nil
}

func NewUploader(flow *contract.FlowContract, clients []*node.Client) (*Uploader, error) {
uploader, err := NewUploaderLight(clients)
if err != nil {
Expand All @@ -55,17 +70,12 @@ func NewUploaderLight(clients []*node.Client) (*Uploader, error) {
panic("storage node not specified")
}
zgClients := make([]*node.ZeroGStorageClient, 0)
shardConfigs := make([]*node.ShardConfig, 0)
for _, client := range clients {
zgClients = append(zgClients, client.ZeroGStorage())
shardConfig, err := client.ZeroGStorage().GetShardConfig()
if err != nil {
return nil, err
}
if shardConfig.NumShard == 0 {
return nil, errors.New("NumShard is zero")
}
shardConfigs = append(shardConfigs, shardConfig)
}
shardConfigs, err := getShardConfigs(clients)
if err != nil {
return nil, err
}
return &Uploader{
clients: zgClients,
Expand Down

0 comments on commit 977d3be

Please sign in to comment.