Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: shard #10

Merged
merged 2 commits into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions cmd/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ var (
node []string

force bool
disperse bool
taskSize uint
}

Expand All @@ -51,7 +50,6 @@ func init() {
uploadCmd.MarkFlagRequired("node")

uploadCmd.Flags().BoolVar(&uploadArgs.force, "force", false, "Force to upload file even already exists")
uploadCmd.Flags().BoolVar(&uploadArgs.disperse, "disperse", false, "Disperse file amoung nodes")
uploadCmd.Flags().UintVar(&uploadArgs.taskSize, "task-size", 10, "Number of segments to upload in single rpc request")

rootCmd.AddCommand(uploadCmd)
Expand All @@ -70,11 +68,13 @@ func upload(*cobra.Command, []string) {
defer client.Close()
}

uploader := transfer.NewUploader(flow, clients)
uploader, err := transfer.NewUploader(flow, clients)
if err != nil {
logrus.WithError(err).Fatal("Failed to initialize uploader")
}
opt := transfer.UploadOption{
Tags: hexutil.MustDecode(uploadArgs.tags),
Force: uploadArgs.force,
Disperse: uploadArgs.disperse,
TaskSize: uploadArgs.taskSize,
}

Expand Down
5 changes: 4 additions & 1 deletion gateway/local_apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,10 @@ func uploadLocalFile(c *gin.Context) (interface{}, error) {
return nil, ErrValidation.WithData("node index out of bound")
}

uploader := transfer.NewUploaderLight([]*node.Client{allClients[input.Node]})
uploader, err := transfer.NewUploaderLight([]*node.Client{allClients[input.Node]})
if err != nil {
return nil, ErrValidation.WithData(err)
}

filename := getFilePath(input.Path, false)

Expand Down
5 changes: 4 additions & 1 deletion kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,10 @@ func (b *Batcher) Exec() error {
}

// upload file
uploader := transfer.NewUploader(b.client.flow, []*node.Client{b.client.node})
uploader, err := transfer.NewUploader(b.client.flow, []*node.Client{b.client.node})
if err != nil {
return err
}
opt := transfer.UploadOption{
Tags: b.BuildTags(),
Force: true,
Expand Down
5 changes: 5 additions & 0 deletions node/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ func (c *ZeroGStorageClient) DownloadSegmentWithProof(root common.Hash, index ui
return
}

func (c *ZeroGStorageClient) GetShardConfig() (shardConfig *ShardConfig, err error) {
err = c.provider.CallContext(context.Background(), &shardConfig, "zgs_getShardConfig")
return
}

// Admin RPCs
type AdminClient struct {
provider *providers.MiddlewarableProvider
Expand Down
5 changes: 5 additions & 0 deletions node/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,8 @@ type KeyValue struct {
Data []byte `json:"data"` // value data
Size uint64 `json:"size"` // value total size
}

type ShardConfig struct {
ShardId uint64 `json:"shardId"`
NumShard uint64 `json:"numShard"`
}
105 changes: 75 additions & 30 deletions transfer/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package transfer
import (
"math/big"
"runtime"
"sort"
"time"

"github.com/0glabs/0g-storage-client/common/parallel"
Expand Down Expand Up @@ -31,32 +32,49 @@ func isDuplicateError(msg string) bool {
type UploadOption struct {
Tags []byte // for kv operations
Force bool // for kv to upload same file
Disperse bool // disperse files to different nodes
TaskSize uint // number of segment to upload in single rpc request
}

type Uploader struct {
flow *contract.FlowContract
clients []*node.ZeroGStorageClient
flow *contract.FlowContract
clients []*node.ZeroGStorageClient
shardConfigs []*node.ShardConfig
}

func NewUploader(flow *contract.FlowContract, clients []*node.Client) *Uploader {
uploader := NewUploaderLight(clients)
func NewUploader(flow *contract.FlowContract, clients []*node.Client) (*Uploader, error) {
uploader, err := NewUploaderLight(clients)
if err != nil {
return nil, err
}
uploader.flow = flow
return uploader
return uploader, nil
}

func NewUploaderLight(clients []*node.Client) *Uploader {
func NewUploaderLight(clients []*node.Client) (*Uploader, error) {
if len(clients) == 0 {
panic("storage node not specified")
}
zgClients := make([]*node.ZeroGStorageClient, 0)
shardConfigs := make([]*node.ShardConfig, 0)
for _, client := range clients {
zgClients = append(zgClients, client.ZeroGStorage())
shardConfig, err := client.ZeroGStorage().GetShardConfig()
if err != nil {
return nil, err
}
if shardConfig.NumShard == 0 {
return nil, errors.New("NumShard is zero")
}
shardConfigs = append(shardConfigs, shardConfig)
}
return &Uploader{
clients: zgClients,
}
clients: zgClients,
shardConfigs: shardConfigs,
}, nil
}

func (uploader *Uploader) needFinality() bool {
return uploader.shardConfigs[0].NumShard == 1
}

// upload data(batchly in 1 blockchain transaction if there are more than one files)
Expand Down Expand Up @@ -127,13 +145,13 @@ func (uploader *Uploader) BatchUpload(datas []core.IterableData, waitForLogEntry

for i := 0; i < n; i++ {
// Upload file to storage node
if err := uploader.UploadFile(datas[i], trees[i], 0, opts[i].Disperse, opts[i].TaskSize); err != nil {
if err := uploader.UploadFile(datas[i], trees[i], 0, opts[i].TaskSize); err != nil {
return common.Hash{}, nil, errors.WithMessage(err, "Failed to upload file")
}

if waitForLogEntry {
// Wait for transaction finality
if err := uploader.waitForLogEntry(trees[i].Root(), !opts[i].Disperse, receipt); err != nil {
if err := uploader.waitForLogEntry(trees[i].Root(), uploader.needFinality(), receipt); err != nil {
return common.Hash{}, nil, errors.WithMessage(err, "Failed to wait for transaction finality on storage node")
}
}
Expand Down Expand Up @@ -220,12 +238,12 @@ func (uploader *Uploader) Upload(data core.IterableData, option ...UploadOption)
}

// Upload file to storage node
if err = uploader.UploadFile(data, tree, segNum, opt.Disperse, opt.TaskSize); err != nil {
if err = uploader.UploadFile(data, tree, segNum, opt.TaskSize); err != nil {
return errors.WithMessage(err, "Failed to upload file")
}

// Wait for transaction finality
if err = uploader.waitForLogEntry(tree.Root(), !opt.Disperse, nil); err != nil {
if err = uploader.waitForLogEntry(tree.Root(), uploader.needFinality(), nil); err != nil {
return errors.WithMessage(err, "Failed to wait for transaction finality on storage node")
}

Expand Down Expand Up @@ -323,8 +341,48 @@ func (uploader *Uploader) waitForLogEntry(root common.Hash, finalityRequired boo
return nil
}

func (uploader *Uploader) NewSegmentUploader(data core.IterableData, tree *merkle.Tree, startSegIndex uint64, taskSize uint) *SegmentUploader {
numSegments := data.NumSegments()
clientTasks := make([][]*UploadTask, 0)
for clientIndex, shardConfig := range uploader.shardConfigs {
var segIndex uint64
r := startSegIndex % shardConfig.NumShard
if r <= shardConfig.ShardId {
segIndex = startSegIndex + shardConfig.ShardId - r
} else {
segIndex = startSegIndex - r + shardConfig.ShardId + shardConfig.NumShard
}
tasks := make([]*UploadTask, 0)
for ; segIndex < numSegments; segIndex += shardConfig.NumShard * uint64(taskSize) {
tasks = append(tasks, &UploadTask{
clientIndex: clientIndex,
segIndex: segIndex,
numShard: shardConfig.NumShard,
})
}
clientTasks = append(clientTasks, tasks)
}
sort.SliceStable(clientTasks, func(i, j int) bool {
return len(clientTasks[i]) > len(clientTasks[j])
})
tasks := make([]*UploadTask, 0)
for taskIndex := 0; taskIndex < len(clientTasks[0]); taskIndex += 1 {
for i := 0; i < len(clientTasks) && taskIndex < len(clientTasks[i]); i += 1 {
tasks = append(tasks, clientTasks[i][taskIndex])
}
}

return &SegmentUploader{
data: data,
tree: tree,
clients: uploader.clients,
tasks: tasks,
taskSize: taskSize,
}
}

// TODO error tolerance
func (uploader *Uploader) UploadFile(data core.IterableData, tree *merkle.Tree, segIndex uint64, disperse bool, taskSize uint) error {
func (uploader *Uploader) UploadFile(data core.IterableData, tree *merkle.Tree, segIndex uint64, taskSize uint) error {
stageTimer := time.Now()

if taskSize == 0 {
Expand All @@ -333,32 +391,19 @@ func (uploader *Uploader) UploadFile(data core.IterableData, tree *merkle.Tree,

logrus.WithFields(logrus.Fields{
"segIndex": segIndex,
"disperse": disperse,
"nodeNum": len(uploader.clients),
}).Info("Begin to upload file")

offset := int64(segIndex * core.DefaultSegmentSize)

numSegments := (data.Size()-offset-1)/core.DefaultSegmentSize + 1
numTasks := int(numSegments-1)/int(taskSize) + 1
segmentUploader := &SegmentUploader{
data: data,
tree: tree,
clients: uploader.clients,
offset: offset,
disperse: disperse,
taskSize: taskSize,
numTasks: numTasks,
}
segmentUploader := uploader.NewSegmentUploader(data, tree, segIndex, taskSize)

err := parallel.Serial(segmentUploader, numTasks, min(runtime.GOMAXPROCS(0), len(uploader.clients)*5), 0)
err := parallel.Serial(segmentUploader, len(segmentUploader.tasks), min(runtime.GOMAXPROCS(0), len(uploader.clients)*5), 0)
if err != nil {
return err
}

logrus.WithFields(logrus.Fields{
"duration": time.Since(stageTimer),
"segNum": numSegments,
"segNum": data.NumSegments() - segIndex,
}).Info("Completed to upload file")

return nil
Expand Down
60 changes: 14 additions & 46 deletions transfer/uploader_parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,18 @@ import (
"github.com/sirupsen/logrus"
)

type UploadTask struct {
clientIndex int
segIndex uint64
numShard uint64
}

type SegmentUploader struct {
data core.IterableData
tree *merkle.Tree
clients []*node.ZeroGStorageClient
offset int64
disperse bool
tasks []*UploadTask
taskSize uint
numTasks int
}

var _ parallel.Interface = (*SegmentUploader)(nil)
Expand All @@ -28,10 +32,10 @@ func (uploader *SegmentUploader) ParallelCollect(result *parallel.Result) error

// ParallelDo implements parallel.Interface.
func (uploader *SegmentUploader) ParallelDo(routine int, task int) (interface{}, error) {
offset := uploader.offset + int64(task)*core.DefaultSegmentSize
numChunks := uploader.data.NumChunks()
numSegments := uploader.data.NumSegments()
segIndex := uint64(offset / core.DefaultSegmentSize)
uploadTask := uploader.tasks[task]
segIndex := uploadTask.segIndex
startSegIndex := segIndex
segments := make([]node.SegmentWithProof, 0)
for i := 0; i < int(uploader.taskSize); i++ {
Expand All @@ -43,7 +47,7 @@ func (uploader *SegmentUploader) ParallelDo(routine int, task int) (interface{},
break
}
// get segment
segment, err := core.ReadAt(uploader.data, core.DefaultSegmentSize, offset, uploader.data.PaddedSize())
segment, err := core.ReadAt(uploader.data, core.DefaultSegmentSize, int64(segIndex*core.DefaultSegmentSize), uploader.data.PaddedSize())
if err != nil {
return nil, err
}
Expand All @@ -66,54 +70,18 @@ func (uploader *SegmentUploader) ParallelDo(routine int, task int) (interface{},
if allDataUploaded {
break
}
segIndex += uint64(uploader.numTasks)
offset += core.DefaultSegmentSize * int64(uploader.numTasks)
segIndex += uploadTask.numShard
}
// upload
if !uploader.disperse {
if _, err := uploader.clients[0].UploadSegments(segments); err != nil && !isDuplicateError(err.Error()) {
return nil, errors.WithMessage(err, "Failed to upload segment")
}
} else {
clientIndex := task % (len(uploader.clients))
ok := false
// retry
for i := 0; i < len(uploader.clients); i++ {
logrus.WithFields(logrus.Fields{
"total": numSegments,
"from_seg_index": startSegIndex,
"to_seg_index": segIndex,
"step": uploader.numTasks,
"clientIndex": clientIndex,
}).Debug("Uploading segment to node..")
if _, err := uploader.clients[clientIndex].UploadSegments(segments); err != nil && !isDuplicateError(err.Error()) {
logrus.WithFields(logrus.Fields{
"total": numSegments,
"from_seg_index": startSegIndex,
"to_seg_index": segIndex,
"step": uploader.numTasks,
"clientIndex": clientIndex,
"error": err,
}).Warn("Failed to upload segment to node, try next node..")
clientIndex = (clientIndex + 1) % (len(uploader.clients))
} else {
ok = true
break
}
}
if !ok {
if _, err := uploader.clients[clientIndex].UploadSegments(segments); err != nil {
return nil, errors.WithMessage(err, "Failed to upload segment")
}
}
if _, err := uploader.clients[uploadTask.clientIndex].UploadSegments(segments); err != nil && !isDuplicateError(err.Error()) {
return nil, errors.WithMessage(err, "Failed to upload segment")
}

if logrus.IsLevelEnabled(logrus.DebugLevel) {
logrus.WithFields(logrus.Fields{
"total": numSegments,
"from_seg_index": startSegIndex,
"to_seg_index": segIndex,
"step": uploader.numTasks,
"step": uploadTask.numShard,
"root": core.SegmentRoot(segments[0].Data),
}).Debug("Segments uploaded")
}
Expand Down
Loading