Skip to content

Commit

Permalink
Implement retry on submission failures
Browse files Browse the repository at this point in the history
  • Loading branch information
bruno-valante committed Sep 20, 2024
1 parent af5b003 commit 2058b5c
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 4 deletions.
35 changes: 33 additions & 2 deletions transfer/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,24 @@ const defaultTaskSize = uint(10)

var dataAlreadyExistsError = "Invalid params: root; data: already uploaded and finalized"
var segmentAlreadyExistsError = "segment has already been uploaded or is being uploaded"
var tooManyDataError = "too many data writing"
var tooManyDataRetries = 12

func isDuplicateError(msg string) bool {
return strings.Contains(msg, dataAlreadyExistsError) || strings.Contains(msg, segmentAlreadyExistsError)
}

func isTooManyDataError(msg string) bool {
return strings.Contains(msg, tooManyDataError)
}

var submitLogEntryRetries = 12
var specifiedBlockError = "Specified block header does not exist"

func isRetriableSubmitLogEntryError(msg string) bool {
return strings.Contains(msg, specifiedBlockError)
}

type FinalityRequirement uint

const (
Expand Down Expand Up @@ -450,7 +463,16 @@ func (uploader *Uploader) SubmitLogEntry(ctx context.Context, datas []core.Itera
opts.Value = submissions[0].Fee(pricePerSector)
}
uploader.logger.WithField("fee(neuron)", opts.Value).Info("submit with fee")
tx, err = uploader.flow.Submit(opts, submissions[0])
for attempt := 0; attempt < submitLogEntryRetries; attempt++ {
tx, err = uploader.flow.Submit(opts, submissions[0])
if err == nil || isRetriableSubmitLogEntryError(err.Error()) || attempt >= submitLogEntryRetries-1 {
break
}
uploader.logger.WithFields(logrus.Fields{
"error": err,
"attempt": attempt,
}).Warn("Failed to submit, retrying...")
}
} else {
if fee != nil {
opts.Value = fee
Expand All @@ -461,7 +483,16 @@ func (uploader *Uploader) SubmitLogEntry(ctx context.Context, datas []core.Itera
}
}
uploader.logger.WithField("fee(neuron)", opts.Value).Info("batch submit with fee")
tx, err = uploader.flow.BatchSubmit(opts, submissions)
for attempt := 0; attempt < submitLogEntryRetries; attempt++ {
tx, err = uploader.flow.BatchSubmit(opts, submissions)
if err == nil || isRetriableSubmitLogEntryError(err.Error()) || attempt >= submitLogEntryRetries-1 {
break
}
uploader.logger.WithFields(logrus.Fields{
"error": err,
"attempt": attempt,
}).Warn("Failed to submit, retrying...")
}
}
if err != nil {
return common.Hash{}, nil, errors.WithMessage(err, "Failed to send transaction to append log entry")
Expand Down
17 changes: 15 additions & 2 deletions transfer/uploader_parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package transfer

import (
"context"
"time"

"github.com/0glabs/0g-storage-client/common/parallel"
"github.com/0glabs/0g-storage-client/core"
"github.com/0glabs/0g-storage-client/core/merkle"
"github.com/0glabs/0g-storage-client/node"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -74,8 +76,19 @@ func (uploader *segmentUploader) ParallelDo(ctx context.Context, routine int, ta
}
segIndex += uploadTask.numShard
}
if _, err := uploader.clients[uploadTask.clientIndex].UploadSegments(ctx, segments); err != nil && !isDuplicateError(err.Error()) {
return nil, err

for i := 0; i < tooManyDataRetries; i++ {
_, err := uploader.clients[uploadTask.clientIndex].UploadSegments(ctx, segments)
if err == nil || isDuplicateError(err.Error()) {
break
}

if isTooManyDataError(err.Error()) && i < tooManyDataRetries-1 {
time.Sleep(10 * time.Second)
continue
}

return nil, errors.WithMessage(err, "Failed to upload segment")
}

if uploader.logger.IsLevelEnabled(logrus.DebugLevel) {
Expand Down

0 comments on commit 2058b5c

Please sign in to comment.