Skip to content
This repository has been archived by the owner on Jun 20, 2023. It is now read-only.

Update to pinner having async pin listing #48

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.18
retract [v1.0.0, v1.0.1]

require (
github.com/Jorropo/channel v0.0.0-20230303124104-2821e25e07ff
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/ipfs/go-block-format v0.0.3
github.com/ipfs/go-blockservice v0.4.0
Expand All @@ -24,6 +25,8 @@ require (
github.com/multiformats/go-multihash v0.2.1
)

replace github.com/Jorropo/channel => github.com/MichaelMure/channel v0.0.0-20230303132646-a77d888b67d4

require (
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect
github.com/go-logr/logr v1.2.3 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ github.com/AndreasBriese/bbloom v0.0.0-20180913140656-343706a395b7/go.mod h1:bOv
github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y=
github.com/MichaelMure/channel v0.0.0-20230303132646-a77d888b67d4 h1:sdBlAQ0I35hcQ8eqA5O48wNoJ7e99z4DSTrc5PDrSRI=
github.com/MichaelMure/channel v0.0.0-20230303132646-a77d888b67d4/go.mod h1:mI95Zfa5HM2woyGuaxl2tTnfZKKzPAyjwcbvmMk7hwI=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
Expand Down
43 changes: 24 additions & 19 deletions simple/reprovide.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"context"
"errors"
"fmt"
"io"
"time"

"github.com/Jorropo/channel"
"github.com/cenkalti/backoff"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-cidutil"
Expand Down Expand Up @@ -180,8 +182,8 @@ func NewBlockstoreProvider(bstore blocks.Blockstore) KeyChanFunc {
// Pinner interface defines how the simple.Reprovider wants to interact
// with a Pinning service
type Pinner interface {
DirectKeys(ctx context.Context) ([]cid.Cid, error)
RecursiveKeys(ctx context.Context) ([]cid.Cid, error)
DirectKeys(ctx context.Context) channel.ReadOnly[cid.Cid]
RecursiveKeys(ctx context.Context) channel.ReadOnly[cid.Cid]
}

// NewPinnedProvider returns provider supplying pinned keys
Expand Down Expand Up @@ -217,37 +219,40 @@ func pinSet(ctx context.Context, pinning Pinner, fetchConfig fetcher.Factory, on
defer cancel()
defer close(set.New)

dkeys, err := pinning.DirectKeys(ctx)
if err != nil {
logR.Errorf("reprovide direct pins: %s", err)
return
}
for _, key := range dkeys {
dkeys := pinning.DirectKeys(ctx)
for {
key, err := dkeys.ReadContext(ctx)
if err == io.EOF {
break
}
if err != nil {
logR.Errorf("reprovide direct pins: %s", err)
return
}
set.Visitor(ctx)(key)
}

rkeys, err := pinning.RecursiveKeys(ctx)
if err != nil {
logR.Errorf("reprovide indirect pins: %s", err)
return
}

session := fetchConfig.NewSession(ctx)
for _, key := range rkeys {
set.Visitor(ctx)(key)

err := pinning.RecursiveKeys(ctx).RangeContext(ctx, func(c cid.Cid) error {
set.Visitor(ctx)(c)
if !onlyRoots {
err := fetcherhelpers.BlockAll(ctx, session, cidlink.Link{Cid: key}, func(res fetcher.FetchResult) error {
err := fetcherhelpers.BlockAll(ctx, session, cidlink.Link{Cid: c}, func(res fetcher.FetchResult) error {
clink, ok := res.LastBlockLink.(cidlink.Link)
if ok {
set.Visitor(ctx)(clink.Cid)
}
return nil
})
if err != nil {
logR.Errorf("reprovide indirect pins: %s", err)
return
return err
}
}
return nil
})
if err != nil {
logR.Errorf("reprovide indirect pins: %s", err)
return
}
}()

Expand Down
29 changes: 25 additions & 4 deletions simple/reprovide_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"
"time"

"github.com/Jorropo/channel"
blocks "github.com/ipfs/go-block-format"
bsrv "github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cid"
Expand Down Expand Up @@ -220,12 +221,32 @@ type mockPinner struct {
direct []cid.Cid
}

func (mp *mockPinner) DirectKeys(ctx context.Context) ([]cid.Cid, error) {
return mp.direct, nil
func (mp *mockPinner) DirectKeys(ctx context.Context) channel.ReadOnly[cid.Cid] {
c := channel.New[cid.Cid]()
go func() {
defer c.Close()
for _, p := range mp.direct {
err := c.WriteContext(ctx, p)
if err != nil {
return
}
}
}()
return c.ReadOnly()
}

func (mp *mockPinner) RecursiveKeys(ctx context.Context) ([]cid.Cid, error) {
return mp.recursive, nil
func (mp *mockPinner) RecursiveKeys(ctx context.Context) channel.ReadOnly[cid.Cid] {
c := channel.New[cid.Cid]()
go func() {
defer c.Close()
for _, p := range mp.recursive {
err := c.WriteContext(ctx, p)
if err != nil {
return
}
}
}()
return c.ReadOnly()
}

func TestReprovidePinned(t *testing.T) {
Expand Down