diff --git a/go.mod b/go.mod index 2867420..f9959db 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.18 retract [v1.0.0, v1.0.1] require ( + github.com/Jorropo/channel v0.0.0-20230303124104-2821e25e07ff github.com/cenkalti/backoff v2.2.1+incompatible github.com/ipfs/go-block-format v0.0.3 github.com/ipfs/go-blockservice v0.4.0 @@ -24,6 +25,8 @@ require ( github.com/multiformats/go-multihash v0.2.1 ) +replace github.com/Jorropo/channel => github.com/MichaelMure/channel v0.0.0-20230303132646-a77d888b67d4 + require ( github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect github.com/go-logr/logr v1.2.3 // indirect diff --git a/go.sum b/go.sum index 5c69367..ab912de 100644 --- a/go.sum +++ b/go.sum @@ -3,6 +3,8 @@ github.com/AndreasBriese/bbloom v0.0.0-20180913140656-343706a395b7/go.mod h1:bOv github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y= +github.com/MichaelMure/channel v0.0.0-20230303132646-a77d888b67d4 h1:sdBlAQ0I35hcQ8eqA5O48wNoJ7e99z4DSTrc5PDrSRI= +github.com/MichaelMure/channel v0.0.0-20230303132646-a77d888b67d4/go.mod h1:mI95Zfa5HM2woyGuaxl2tTnfZKKzPAyjwcbvmMk7hwI= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= diff --git a/simple/reprovide.go b/simple/reprovide.go index 0225340..70366f2 100644 --- a/simple/reprovide.go +++ b/simple/reprovide.go @@ -4,8 +4,10 @@ import ( "context" "errors" "fmt" + "io" "time" + "github.com/Jorropo/channel" "github.com/cenkalti/backoff" "github.com/ipfs/go-cid" "github.com/ipfs/go-cidutil" @@ -180,8 +182,8 @@ func NewBlockstoreProvider(bstore blocks.Blockstore) KeyChanFunc { // Pinner interface defines how the simple.Reprovider wants to interact // with a Pinning service type Pinner interface { - DirectKeys(ctx context.Context) ([]cid.Cid, error) - RecursiveKeys(ctx context.Context) ([]cid.Cid, error) + DirectKeys(ctx context.Context) channel.ReadOnly[cid.Cid] + RecursiveKeys(ctx context.Context) channel.ReadOnly[cid.Cid] } // NewPinnedProvider returns provider supplying pinned keys @@ -217,26 +219,25 @@ func pinSet(ctx context.Context, pinning Pinner, fetchConfig fetcher.Factory, on defer cancel() defer close(set.New) - dkeys, err := pinning.DirectKeys(ctx) - if err != nil { - logR.Errorf("reprovide direct pins: %s", err) - return - } - for _, key := range dkeys { + dkeys := pinning.DirectKeys(ctx) + for { + key, err := dkeys.ReadContext(ctx) + if err == io.EOF { + break + } + if err != nil { + logR.Errorf("reprovide direct pins: %s", err) + return + } set.Visitor(ctx)(key) } - rkeys, err := pinning.RecursiveKeys(ctx) - if err != nil { - logR.Errorf("reprovide indirect pins: %s", err) - return - } - session := fetchConfig.NewSession(ctx) - for _, key := range rkeys { - set.Visitor(ctx)(key) + + err := pinning.RecursiveKeys(ctx).RangeContext(ctx, func(c cid.Cid) error { + set.Visitor(ctx)(c) if !onlyRoots { - err := fetcherhelpers.BlockAll(ctx, session, cidlink.Link{Cid: key}, func(res fetcher.FetchResult) error { + err := fetcherhelpers.BlockAll(ctx, session, cidlink.Link{Cid: c}, func(res fetcher.FetchResult) error { clink, ok := res.LastBlockLink.(cidlink.Link) if ok { set.Visitor(ctx)(clink.Cid) @@ -244,10 +245,14 @@ func pinSet(ctx context.Context, pinning Pinner, fetchConfig fetcher.Factory, on return nil }) if err != nil { - logR.Errorf("reprovide indirect pins: %s", err) - return + return err } } + return nil + }) + if err != nil { + logR.Errorf("reprovide indirect pins: %s", err) + return } }() diff --git a/simple/reprovide_test.go b/simple/reprovide_test.go index e29d6e4..9642ad6 100644 --- a/simple/reprovide_test.go +++ b/simple/reprovide_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/Jorropo/channel" blocks "github.com/ipfs/go-block-format" bsrv "github.com/ipfs/go-blockservice" "github.com/ipfs/go-cid" @@ -220,12 +221,32 @@ type mockPinner struct { direct []cid.Cid } -func (mp *mockPinner) DirectKeys(ctx context.Context) ([]cid.Cid, error) { - return mp.direct, nil +func (mp *mockPinner) DirectKeys(ctx context.Context) channel.ReadOnly[cid.Cid] { + c := channel.New[cid.Cid]() + go func() { + defer c.Close() + for _, p := range mp.direct { + err := c.WriteContext(ctx, p) + if err != nil { + return + } + } + }() + return c.ReadOnly() } -func (mp *mockPinner) RecursiveKeys(ctx context.Context) ([]cid.Cid, error) { - return mp.recursive, nil +func (mp *mockPinner) RecursiveKeys(ctx context.Context) channel.ReadOnly[cid.Cid] { + c := channel.New[cid.Cid]() + go func() { + defer c.Close() + for _, p := range mp.recursive { + err := c.WriteContext(ctx, p) + if err != nil { + return + } + } + }() + return c.ReadOnly() } func TestReprovidePinned(t *testing.T) {