Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

retry failed requests #12

Merged
merged 3 commits into from
Feb 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions caboose.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ type Config struct {
PoolRefresh time.Duration
PoolMaxSize int
MaxConcurrency int
MaxRetries int
}

const DefaultMaxRetries = 3

var ErrNotImplemented error = errors.New("not implemented")

type Caboose struct {
Expand Down
38 changes: 29 additions & 9 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"io"
"net/http"
"net/url"
"os"
"strings"
"sync"
"time"

Expand All @@ -20,6 +22,9 @@ import (

// loadPool refreshes the set of endpoints to fetch cars from from the Orchestrator Endpoint
func (p *pool) loadPool() ([]string, error) {
if override := os.Getenv("CABOOSE_BACKEND_OVERRIDE"); len(override) > 0 {
return strings.Split(override, ","), nil
}
resp, err := p.config.OrchestratorClient.Get(p.config.OrchestratorEndpoint.String())
if err != nil {
goLogger.Warnw("failed to get backends from orchestrator", "err", err, "endpoint", p.config.OrchestratorEndpoint.String())
Expand Down Expand Up @@ -153,18 +158,33 @@ func (p *pool) Close() {
}
}

func (p *pool) fetchWith(ctx context.Context, c cid.Cid, with string) (blocks.Block, error) {
func (p *pool) fetchWith(ctx context.Context, c cid.Cid, with string) (blk blocks.Block, err error) {
<-p.started
aff := with
if aff == "" {
aff = c.Hash().B58String()

left := p.config.MaxRetries
if left == 0 {
left = DefaultMaxRetries
}
p.lk.RLock()
member := p.c.LocateKey([]byte(aff))
p.lk.RUnlock()
root := member.String()

return p.doFetch(ctx, root, c)
for i := 0; i < left; i++ {
aff := with
if aff == "" {
aff = fmt.Sprintf("%d%s", i, c.Hash().B58String())
} else {
aff = fmt.Sprintf("%d%s", i, aff)
}
p.lk.RLock()
member := p.c.LocateKey([]byte(aff))
p.lk.RUnlock()
root := member.String()

blk, err = p.doFetch(ctx, root, c)
if err != nil {
continue
}
return
}
return
}

var tmpl = "http://%s/ipfs/%s?format=raw"
Expand Down