From 977ceee0178cfa590b0b5abe558660d65dc612e1 Mon Sep 17 00:00:00 2001 From: Shannon Wells Date: Tue, 19 Nov 2019 15:17:39 -0800 Subject: [PATCH] Feat/dt graphsync pullreqs (#627) * graphsync responses to pull requests --- datatransfer/impl/graphsync/graphsync.go | 66 ++++-- datatransfer/impl/graphsync/graphsync_test.go | 211 +++++++++++------- datatransfer/types.go | 2 + go.sum | 1 + 4 files changed, 181 insertions(+), 99 deletions(-) diff --git a/datatransfer/impl/graphsync/graphsync.go b/datatransfer/impl/graphsync/graphsync.go index e4019785b09..66fe408ea5e 100644 --- a/datatransfer/impl/graphsync/graphsync.go +++ b/datatransfer/impl/graphsync/graphsync.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "fmt" - cidlink "github.com/ipld/go-ipld-prime/linking/cid" "math/rand" "reflect" @@ -14,6 +13,7 @@ import ( "github.com/ipld/go-ipld-prime" "github.com/ipld/go-ipld-prime/encoding/dagcbor" ipldfree "github.com/ipld/go-ipld-prime/impl/free" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" @@ -56,7 +56,7 @@ type graphsyncImpl struct { } type graphsyncReceiver struct { - ctx context.Context + ctx context.Context impl *graphsyncImpl } @@ -76,10 +76,10 @@ func NewGraphSyncDataTransfer(parent context.Context, host host.Host, gs graphsy } // RegisterVoucherType registers a validator for the given voucher type -// will error if voucher type does not implement voucher -// or if there is a voucher type registered with an identical identifier -// This assumes that the voucherType is a pointer type, and that anything that implements datatransfer.Voucher -// Takes a pointer receiver. +// returns error if: +// * voucher type does not implement voucher +// * there is a voucher type registered with an identical identifier +// * voucherType's Kind is not reflect.Ptr func (impl *graphsyncImpl) RegisterVoucherType(voucherType reflect.Type, validator datatransfer.RequestValidator) error { if voucherType.Kind() != reflect.Ptr { return fmt.Errorf("voucherType must be a reflect.Ptr Kind") @@ -109,24 +109,28 @@ func (impl *graphsyncImpl) OpenPushDataChannel(ctx context.Context, to peer.ID, if err != nil { return datatransfer.ChannelID{}, err } - chid := impl.createNewChannel(tid, to, baseCid, selector, voucher) + chid := impl.createNewChannel(tid, baseCid, selector, voucher, to, "", to) return chid, nil } // OpenPullDataChannel opens a data transfer that will request data from the sending peer and // transfer parts of the piece that match the selector func (impl *graphsyncImpl) OpenPullDataChannel(ctx context.Context, to peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) (datatransfer.ChannelID, error) { + tid, err := impl.sendRequest(ctx, selector, true, voucher, baseCid, to) if err != nil { return datatransfer.ChannelID{}, err } - chid := impl.createNewChannel(tid, to, baseCid, selector, voucher) + chid := impl.createNewChannel(tid, baseCid, selector, voucher, to, to, "") return chid, nil } -// createNewChannel creates a new channel id -func (impl *graphsyncImpl) createNewChannel(tid datatransfer.TransferID, to peer.ID, baseCid cid.Cid, selector ipld.Node, voucher datatransfer.Voucher) datatransfer.ChannelID { - return datatransfer.ChannelID{To: to, ID: tid} +// createNewChannel creates a new channel id and channel state and saves to channels +func (impl *graphsyncImpl) createNewChannel(tid datatransfer.TransferID, baseCid cid.Cid, selector ipld.Node, voucher datatransfer.Voucher, to, sender, receiver peer.ID) datatransfer.ChannelID { + chid := datatransfer.ChannelID{To: to, ID: tid} + chst := datatransfer.ChannelState{Channel: datatransfer.NewChannel(0, baseCid, selector, voucher, sender, receiver, 0)} + impl.channels[chid] = chst + return chid } // sendRequest encapsulates message creation and posting to the data transfer network with the provided parameters @@ -148,9 +152,9 @@ func (impl *graphsyncImpl) sendRequest(ctx context.Context, selector ipld.Node, return tid, nil } -func (impl *graphsyncImpl) sendResponse(ctx context.Context, isAccepted bool, to peer.ID, tid datatransfer.TransferID){ +func (impl *graphsyncImpl) sendResponse(ctx context.Context, isAccepted bool, to peer.ID, tid datatransfer.TransferID) { resp := message.NewResponse(tid, isAccepted) - if err := impl.dataTransferNetwork.SendMessage(ctx, to, resp); err != nil { + if err := impl.dataTransferNetwork.SendMessage(ctx, to, resp); err != nil { log.Error(err) } } @@ -186,7 +190,7 @@ func (impl *graphsyncImpl) unsubscribeAt(sub datatransfer.Subscriber) datatransf } func (impl *graphsyncImpl) notifySubscribers(evt datatransfer.Event, cs datatransfer.ChannelState) { - for _,cb := range impl.subscribers { + for _, cb := range impl.subscribers { cb(evt, cs) } } @@ -273,17 +277,39 @@ func (receiver *graphsyncReceiver) voucherFromRequest(incoming message.DataTrans return voucher, nil } +// ReceiveResponse handles responding to Push or Pull Requests. +// It schedules a graphsync transfer only if a Pull Request is accepted. func (receiver *graphsyncReceiver) ReceiveResponse( ctx context.Context, sender peer.ID, incoming message.DataTransferResponse) { - var evt datatransfer.Event - if !incoming.Accepted() { - evt = datatransfer.Error - } else { - evt = datatransfer.Progress // for now + evt := datatransfer.Error + chst := datatransfer.EmptyChannelState + if incoming.Accepted() { + chid := datatransfer.ChannelID{ + To: sender, + ID: incoming.TransferID(), + } + if chst = receiver.impl.getPullChannel(chid); chst != datatransfer.EmptyChannelState { + baseCid := chst.BaseCID() + root := cidlink.Link{baseCid} + receiver.impl.gs.Request(ctx, sender, root, chst.Selector()) + evt = datatransfer.Progress } - receiver.impl.notifySubscribers(evt, datatransfer.ChannelState{}) + } + receiver.impl.notifySubscribers(evt, chst) +} + +// getPullChannel searches for a pull-type channel in the slice of channels with id `chid`. +// Returns datatransfer.EmptyChannelState if: +// * there is no channel with that id +// * it is not related to a pull request +func (impl *graphsyncImpl) getPullChannel(chid datatransfer.ChannelID) datatransfer.ChannelState { + channelState, ok := impl.channels[chid] + if !ok || channelState.Sender() == "" { + return datatransfer.EmptyChannelState + } + return channelState } func (receiver *graphsyncReceiver) ReceiveError(error) {} diff --git a/datatransfer/impl/graphsync/graphsync_test.go b/datatransfer/impl/graphsync/graphsync_test.go index 213cd63e70f..a51c2e2eee6 100644 --- a/datatransfer/impl/graphsync/graphsync_test.go +++ b/datatransfer/impl/graphsync/graphsync_test.go @@ -773,86 +773,138 @@ func TestDataTransferInitiatingPushGraphsyncRequests(t *testing.T) { }) } -// TODO: get passing to complete https://github.com/filecoin-project/go-data-transfer/issues/21 func TestDataTransferInitiatingPullGraphsyncRequests(t *testing.T) { - //ctx := context.Background() - //ctx, cancel := context.WithTimeout(ctx, 10*time.Second) - //defer cancel() - //gsData := newGraphsyncTestingData(t, ctx) - //host1 := gsData.host1 - //host2 := gsData.host2 - // - //gs2 := &fakeGraphSync{ - // receivedRequests: make(chan receivedGraphSyncRequest, 1), - //} - //voucher := fakeDTType{"applesauce"} - //baseCid := testutil.GenerateCids(1)[0] - // - //gs1 := &fakeGraphSync{ - // receivedRequests: make(chan receivedGraphSyncRequest, 1), - //} - //dt1 := NewGraphSyncDataTransfer(ctx, host1, gs1) - // - //t.Run("with successful validation", func(t *testing.T) { - // sv := newSV() - // sv.expectSuccessPull() - // - // dt2 := NewGraphSyncDataTransfer(ctx, host2, gs2) - // err := dt2.RegisterVoucherType(reflect.TypeOf(&fakeDTType{}), sv) - // require.NoError(t, err) - // - // _, err = dt1.OpenPullDataChannel(ctx, host2.ID(), &voucher, baseCid, gsData.allSelector) - // require.NoError(t, err) - // - // var requestReceived receivedGraphSyncRequest - // select { - // case <-ctx.Done(): - // t.Fatal("did not receive message sent") - // case requestReceived = <-gs1.receivedRequests: - // } - // - // sv.verifyExpectations(t) - // - // receiver := requestReceived.p - // require.Equal(t, receiver, host2.ID()) - // - // cl, ok := requestReceived.root.(cidlink.Link) - // require.True(t, ok) - // require.Equal(t, baseCid, cl.Cid) - // - // require.Equal(t, gsData.allSelector, requestReceived.selector) - //}) - // - //t.Run("with error validation", func(t *testing.T) { - // sv := newSV() - // sv.expectErrorPull() - // - // dt2 := NewGraphSyncDataTransfer(ctx, host2, gs2) - // err := dt2.RegisterVoucherType(reflect.TypeOf(&fakeDTType{}), sv) - // require.NoError(t, err) - // - // subscribeCalls := make(chan struct{}, 1) - // subscribe := func(event datatransfer.Event, channelState datatransfer.ChannelState) { - // if event == datatransfer.Error { - // subscribeCalls <- struct{}{} - // } - // } - // unsub := dt1.SubscribeToEvents(subscribe) - // _, err = dt1.OpenPullDataChannel(ctx, host2.ID(), &voucher, baseCid, gsData.allSelector) - // require.NoError(t, err) - // - // select { - // case <-ctx.Done(): - // t.Fatal("subscribed events not received") - // case <-subscribeCalls: - // } - // - // sv.verifyExpectations(t) - // - // // no graphsync request should be scheduled - // require.Empty(t, gs1.receivedRequests) - // unsub() - //}) + ctx := context.Background() + gsData := newGraphsyncTestingData(t, ctx) + host1 := gsData.host1 + host2 := gsData.host2 + + voucher := fakeDTType{"applesauce"} + baseCid := testutil.GenerateCids(1)[0] + + t.Run("with successful validation", func(t *testing.T) { + gs1 := &fakeGraphSync{ + receivedRequests: make(chan receivedGraphSyncRequest, 1), + } + gs2 := &fakeGraphSync{ + receivedRequests: make(chan receivedGraphSyncRequest, 1), + } + + sv := newSV() + sv.expectSuccessPull() + + bg := ctx + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + dt1 := NewGraphSyncDataTransfer(bg, host1, gs1) + dt2 := NewGraphSyncDataTransfer(bg, host2, gs2) + err := dt2.RegisterVoucherType(reflect.TypeOf(&fakeDTType{}), sv) + require.NoError(t, err) + + _, err = dt1.OpenPullDataChannel(ctx, host2.ID(), &voucher, baseCid, gsData.allSelector) + require.NoError(t, err) + + var requestReceived receivedGraphSyncRequest + select { + case <-ctx.Done(): + t.Fatal("did not receive message sent") + case requestReceived = <-gs1.receivedRequests: + } + sv.verifyExpectations(t) + + receiver := requestReceived.p + require.Equal(t, receiver, host2.ID()) + + cl, ok := requestReceived.root.(cidlink.Link) + require.True(t, ok) + require.Equal(t, baseCid, cl.Cid) + + require.Equal(t, gsData.allSelector, requestReceived.selector) + }) + + t.Run("with error validation", func(t *testing.T) { + gs1 := &fakeGraphSync{ + receivedRequests: make(chan receivedGraphSyncRequest, 1), + } + gs2 := &fakeGraphSync{ + receivedRequests: make(chan receivedGraphSyncRequest, 1), + } + + dt1 := NewGraphSyncDataTransfer(ctx, host1, gs1) + sv := newSV() + sv.expectErrorPull() + + dt2 := NewGraphSyncDataTransfer(ctx, host2, gs2) + err := dt2.RegisterVoucherType(reflect.TypeOf(&fakeDTType{}), sv) + require.NoError(t, err) + + subscribeCalls := make(chan struct{}, 1) + subscribe := func(event datatransfer.Event, channelState datatransfer.ChannelState) { + if event == datatransfer.Error { + subscribeCalls <- struct{}{} + } + } + unsub := dt1.SubscribeToEvents(subscribe) + _, err = dt1.OpenPullDataChannel(ctx, host2.ID(), &voucher, baseCid, gsData.allSelector) + require.NoError(t, err) + + select { + case <-ctx.Done(): + t.Fatal("subscribed events not received") + case <-subscribeCalls: + } + + // give a little time for the validation to happen + //time.Sleep(15*time.Millisecond) + sv.verifyExpectations(t) + + // no graphsync request should be scheduled + require.Empty(t, gs1.receivedRequests) + unsub() + }) + + t.Run("does not schedule graphsync request if is push request", func(t *testing.T) { + gs1 := &fakeGraphSync{ + receivedRequests: make(chan receivedGraphSyncRequest, 1), + } + gs2 := &fakeGraphSync{ + receivedRequests: make(chan receivedGraphSyncRequest, 1), + } + + sv := newSV() + sv.expectSuccessPush() + + bg := ctx + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + dt1 := NewGraphSyncDataTransfer(bg, host1, gs1) + dt2 := NewGraphSyncDataTransfer(bg, host2, gs2) + err := dt2.RegisterVoucherType(reflect.TypeOf(&fakeDTType{}), sv) + require.NoError(t, err) + + subscribeCalls := make(chan struct{}, 1) + subscribe := func(event datatransfer.Event, channelState datatransfer.ChannelState) { + if event == datatransfer.Error { + subscribeCalls <- struct{}{} + } + } + unsub := dt1.SubscribeToEvents(subscribe) + _, err = dt1.OpenPushDataChannel(ctx, host2.ID(), &voucher, baseCid, gsData.allSelector) + require.NoError(t, err) + + select { + case <-ctx.Done(): + t.Fatal("subscribed events not received") + case <-subscribeCalls: + } + sv.verifyExpectations(t) + + // no graphsync request should be scheduled + require.Empty(t, gs1.receivedRequests) + unsub() + }) } type receivedGraphSyncMessage struct { @@ -1351,6 +1403,7 @@ type fakeGraphSync struct { // Request initiates a new GraphSync request to the given peer using the given selector spec. func (fgs *fakeGraphSync) Request(ctx context.Context, p peer.ID, root ipld.Link, selector ipld.Node, extensions ...graphsync.ExtensionData) (<-chan graphsync.ResponseProgress, <-chan error) { + fgs.receivedRequests <- receivedGraphSyncRequest{p, root, selector, extensions} responses := make(chan graphsync.ResponseProgress) errors := make(chan error) diff --git a/datatransfer/types.go b/datatransfer/types.go index b096be69063..899d19ae9b7 100644 --- a/datatransfer/types.go +++ b/datatransfer/types.go @@ -109,6 +109,8 @@ type ChannelState struct { received uint64 } +var EmptyChannelState = ChannelState{} + // Sent returns the number of bytes sent func (c ChannelState) Sent() uint64 { return c.sent } diff --git a/go.sum b/go.sum index 075b752b7ea..48a97dbd050 100644 --- a/go.sum +++ b/go.sum @@ -731,6 +731,7 @@ google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRn google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.20.1 h1:Hz2g2wirWK7H0qIIhGIqRGTuMwTE8HEKFnDZZ7lm9NU= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=