From 40c7b64e9094b08951744e6e557d2d645d76dfd0 Mon Sep 17 00:00:00 2001 From: Chenxing Li Date: Mon, 23 Sep 2024 14:08:06 +0800 Subject: [PATCH] Implement retry on submission failures (#61) Co-authored-by: Bruno Valente <140794260+bruno-valante@users.noreply.github.com> --- transfer/uploader.go | 37 +++++++++++++++++++++++++++++++++-- transfer/uploader_parallel.go | 17 ++++++++++++++-- 2 files changed, 50 insertions(+), 4 deletions(-) diff --git a/transfer/uploader.go b/transfer/uploader.go index 3dd2fd7..1978bc4 100644 --- a/transfer/uploader.go +++ b/transfer/uploader.go @@ -36,11 +36,24 @@ const defaultTaskSize = uint(10) var dataAlreadyExistsError = "Invalid params: root; data: already uploaded and finalized" var segmentAlreadyExistsError = "segment has already been uploaded or is being uploaded" +var tooManyDataError = "too many data writing" +var tooManyDataRetries = 12 func isDuplicateError(msg string) bool { return strings.Contains(msg, dataAlreadyExistsError) || strings.Contains(msg, segmentAlreadyExistsError) } +func isTooManyDataError(msg string) bool { + return strings.Contains(msg, tooManyDataError) +} + +var submitLogEntryRetries = 12 +var specifiedBlockError = "Specified block header does not exist" + +func isRetriableSubmitLogEntryError(msg string) bool { + return strings.Contains(msg, specifiedBlockError) +} + type FinalityRequirement uint const ( @@ -450,7 +463,17 @@ func (uploader *Uploader) SubmitLogEntry(ctx context.Context, datas []core.Itera opts.Value = submissions[0].Fee(pricePerSector) } uploader.logger.WithField("fee(neuron)", opts.Value).Info("submit with fee") - tx, err = uploader.flow.Submit(opts, submissions[0]) + for attempt := 0; attempt < submitLogEntryRetries; attempt++ { + tx, err = uploader.flow.Submit(opts, submissions[0]) + if err == nil || !isRetriableSubmitLogEntryError(err.Error()) || attempt >= submitLogEntryRetries-1 { + break + } + uploader.logger.WithFields(logrus.Fields{ + "error": err, + "attempt": attempt, + }).Warn("Failed to submit, retrying...") + time.Sleep(10 * time.Second) + } } else { if fee != nil { opts.Value = fee @@ -461,7 +484,17 @@ func (uploader *Uploader) SubmitLogEntry(ctx context.Context, datas []core.Itera } } uploader.logger.WithField("fee(neuron)", opts.Value).Info("batch submit with fee") - tx, err = uploader.flow.BatchSubmit(opts, submissions) + for attempt := 0; attempt < submitLogEntryRetries; attempt++ { + tx, err = uploader.flow.BatchSubmit(opts, submissions) + if err == nil || !isRetriableSubmitLogEntryError(err.Error()) || attempt >= submitLogEntryRetries-1 { + break + } + uploader.logger.WithFields(logrus.Fields{ + "error": err, + "attempt": attempt, + }).Warn("Failed to submit, retrying...") + time.Sleep(10 * time.Second) + } } if err != nil { return common.Hash{}, nil, errors.WithMessage(err, "Failed to send transaction to append log entry") diff --git a/transfer/uploader_parallel.go b/transfer/uploader_parallel.go index 0413ebf..6f8dfa1 100644 --- a/transfer/uploader_parallel.go +++ b/transfer/uploader_parallel.go @@ -2,11 +2,13 @@ package transfer import ( "context" + "time" "github.com/0glabs/0g-storage-client/common/parallel" "github.com/0glabs/0g-storage-client/core" "github.com/0glabs/0g-storage-client/core/merkle" "github.com/0glabs/0g-storage-client/node" + "github.com/pkg/errors" "github.com/sirupsen/logrus" ) @@ -74,8 +76,19 @@ func (uploader *segmentUploader) ParallelDo(ctx context.Context, routine int, ta } segIndex += uploadTask.numShard } - if _, err := uploader.clients[uploadTask.clientIndex].UploadSegments(ctx, segments); err != nil && !isDuplicateError(err.Error()) { - return nil, err + + for i := 0; i < tooManyDataRetries; i++ { + _, err := uploader.clients[uploadTask.clientIndex].UploadSegments(ctx, segments) + if err == nil || isDuplicateError(err.Error()) { + break + } + + if isTooManyDataError(err.Error()) && i < tooManyDataRetries-1 { + time.Sleep(10 * time.Second) + continue + } + + return nil, errors.WithMessage(err, "Failed to upload segment") } if uploader.logger.IsLevelEnabled(logrus.DebugLevel) {