Skip to content
This repository has been archived by the owner on Feb 7, 2024. It is now read-only.

pin ls --stream support #190

Merged
merged 2 commits into from
May 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 44 additions & 1 deletion shell.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ type PinInfo struct {
}

// Pins returns a map of the pin hashes to their info (currently just the
// pin type, one of DirectPin, RecursivePin, or IndirectPin. A map is returned
// pin type, one of DirectPin, RecursivePin, or IndirectPin). A map is returned
// instead of a slice because it is easier to do existence lookup by map key
// than unordered array searching. The map is likely to be more useful to a
// client than a flat list.
Expand All @@ -237,6 +237,49 @@ func (s *Shell) Pins() (map[string]PinInfo, error) {
return raw.Keys, s.Request("pin/ls").Exec(context.Background(), &raw)
}

// PinStreamInfo is the output type for PinsStream
type PinStreamInfo struct {
Cid string
Type string
}

// PinsStream is a streamed version of Pins. It returns a channel of the pins
// with their type, one of DirectPin, RecursivePin, or IndirectPin.
func (s *Shell) PinsStream(ctx context.Context) (<-chan PinStreamInfo, error) {
resp, err := s.Request("pin/ls").
Option("stream", true).
Send(ctx)
if err != nil {
return nil, err
}

if resp.Error != nil {
resp.Close()
return nil, resp.Error
}

out := make(chan PinStreamInfo)
go func() {
defer resp.Close()
var pin PinStreamInfo
defer close(out)
dec := json.NewDecoder(resp.Output)
for {
err := dec.Decode(&pin)
if err != nil {
return
}
select {
case out <- pin:
case <-ctx.Done():
return
}
}
}()

return out, nil
}

type PeerInfo struct {
Addrs []string
ID string
Expand Down
48 changes: 48 additions & 0 deletions shell_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,54 @@ func TestPins(t *testing.T) {
is.Equal(info.Type, RecursivePin)
}

func TestPinsStream(t *testing.T) {
is := is.New(t)
s := NewShell(shellUrl)

// Add a thing, which pins it by default
h, err := s.Add(bytes.NewBufferString("go-ipfs-api pins test 0C7023F8-1FEC-4155-A8E0-432A5853F46B"))
is.Nil(err)

pinChan, err := s.PinsStream(context.Background())
is.Nil(err)

pins := accumulatePins(pinChan)

_, ok := pins[h]
is.True(ok)

err = s.Unpin(h)
is.Nil(err)

pinChan, err = s.PinsStream(context.Background())
is.Nil(err)

pins = accumulatePins(pinChan)

_, ok = pins[h]
is.False(ok)

err = s.Pin(h)
is.Nil(err)

pinChan, err = s.PinsStream(context.Background())
is.Nil(err)

pins = accumulatePins(pinChan)

_type, ok := pins[h]
is.True(ok)
is.Equal(_type, RecursivePin)
}

func accumulatePins(pinChan <-chan PinStreamInfo) map[string]string {
pins := make(map[string]string)
for pin := range pinChan {
pins[pin.Cid] = pin.Type
}
return pins
}

func TestPatch_rmLink(t *testing.T) {
is := is.New(t)
s := NewShell(shellUrl)
Expand Down