From 0f69d97cba7e448c1b33c78085b4d144cf6db830 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Mur=C3=A9?= Date: Fri, 12 Jul 2019 17:00:42 +0200 Subject: [PATCH 1/2] pin ls --stream support --- shell.go | 41 ++++++++++++++++++++++++++++++++++++++++- shell_test.go | 48 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 88 insertions(+), 1 deletion(-) diff --git a/shell.go b/shell.go index 12b6f2d70..bde800f29 100644 --- a/shell.go +++ b/shell.go @@ -228,7 +228,7 @@ type PinInfo struct { } // Pins returns a map of the pin hashes to their info (currently just the -// pin type, one of DirectPin, RecursivePin, or IndirectPin. A map is returned +// pin type, one of DirectPin, RecursivePin, or IndirectPin). A map is returned // instead of a slice because it is easier to do existence lookup by map key // than unordered array searching. The map is likely to be more useful to a // client than a flat list. @@ -237,6 +237,45 @@ func (s *Shell) Pins() (map[string]PinInfo, error) { return raw.Keys, s.Request("pin/ls").Exec(context.Background(), &raw) } +// PinStreamInfo is the output type for PinsStream +type PinStreamInfo struct { + Cid string + Type string +} + +// PinsStream is a streamed version of Pins. It returns a channel of the pins +// with their type, one of DirectPin, RecursivePin, or IndirectPin. +func (s *Shell) PinsStream() (<-chan PinStreamInfo, error) { + resp, err := s.Request("pin/ls"). + Option("stream", true). + Send(context.Background()) + if err != nil { + return nil, err + } + + if resp.Error != nil { + resp.Close() + return nil, resp.Error + } + + out := make(chan PinStreamInfo) + go func() { + defer resp.Close() + var pin PinStreamInfo + defer close(out) + dec := json.NewDecoder(resp.Output) + for { + err := dec.Decode(&pin) + if err != nil { + return + } + out <- pin + } + }() + + return out, nil +} + type PeerInfo struct { Addrs []string ID string diff --git a/shell_test.go b/shell_test.go index 59297ef0b..ca4e6710a 100644 --- a/shell_test.go +++ b/shell_test.go @@ -239,6 +239,54 @@ func TestPins(t *testing.T) { is.Equal(info.Type, RecursivePin) } +func TestPinsStream(t *testing.T) { + is := is.New(t) + s := NewShell(shellUrl) + + // Add a thing, which pins it by default + h, err := s.Add(bytes.NewBufferString("go-ipfs-api pins test 0C7023F8-1FEC-4155-A8E0-432A5853F46B")) + is.Nil(err) + + pinChan, err := s.PinsStream() + is.Nil(err) + + pins := accumulatePins(pinChan) + + _, ok := pins[h] + is.True(ok) + + err = s.Unpin(h) + is.Nil(err) + + pinChan, err = s.PinsStream() + is.Nil(err) + + pins = accumulatePins(pinChan) + + _, ok = pins[h] + is.False(ok) + + err = s.Pin(h) + is.Nil(err) + + pinChan, err = s.PinsStream() + is.Nil(err) + + pins = accumulatePins(pinChan) + + _type, ok := pins[h] + is.True(ok) + is.Equal(_type, RecursivePin) +} + +func accumulatePins(pinChan <-chan PinStreamInfo) map[string]string { + pins := make(map[string]string) + for pin := range pinChan { + pins[pin.Cid] = pin.Type + } + return pins +} + func TestPatch_rmLink(t *testing.T) { is := is.New(t) s := NewShell(shellUrl) From f19a34abfce3d9e26f4c192acffd2bf1cef2f03c Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Mon, 4 May 2020 17:39:39 -0700 Subject: [PATCH 2/2] fix: pass a context to PinsStream --- shell.go | 10 +++++++--- shell_test.go | 6 +++--- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/shell.go b/shell.go index bde800f29..6b6a5dd37 100644 --- a/shell.go +++ b/shell.go @@ -245,10 +245,10 @@ type PinStreamInfo struct { // PinsStream is a streamed version of Pins. It returns a channel of the pins // with their type, one of DirectPin, RecursivePin, or IndirectPin. -func (s *Shell) PinsStream() (<-chan PinStreamInfo, error) { +func (s *Shell) PinsStream(ctx context.Context) (<-chan PinStreamInfo, error) { resp, err := s.Request("pin/ls"). Option("stream", true). - Send(context.Background()) + Send(ctx) if err != nil { return nil, err } @@ -269,7 +269,11 @@ func (s *Shell) PinsStream() (<-chan PinStreamInfo, error) { if err != nil { return } - out <- pin + select { + case out <- pin: + case <-ctx.Done(): + return + } } }() diff --git a/shell_test.go b/shell_test.go index ca4e6710a..15c7ee61d 100644 --- a/shell_test.go +++ b/shell_test.go @@ -247,7 +247,7 @@ func TestPinsStream(t *testing.T) { h, err := s.Add(bytes.NewBufferString("go-ipfs-api pins test 0C7023F8-1FEC-4155-A8E0-432A5853F46B")) is.Nil(err) - pinChan, err := s.PinsStream() + pinChan, err := s.PinsStream(context.Background()) is.Nil(err) pins := accumulatePins(pinChan) @@ -258,7 +258,7 @@ func TestPinsStream(t *testing.T) { err = s.Unpin(h) is.Nil(err) - pinChan, err = s.PinsStream() + pinChan, err = s.PinsStream(context.Background()) is.Nil(err) pins = accumulatePins(pinChan) @@ -269,7 +269,7 @@ func TestPinsStream(t *testing.T) { err = s.Pin(h) is.Nil(err) - pinChan, err = s.PinsStream() + pinChan, err = s.PinsStream(context.Background()) is.Nil(err) pins = accumulatePins(pinChan)