Skip to content

Commit

Permalink
Feat/dt graphsync pullreqs (#627)
Browse files Browse the repository at this point in the history
* graphsync responses to pull requests
  • Loading branch information
shannonwells authored and hannahhoward committed Dec 18, 2019
1 parent 3fddc49 commit 977ceee
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 99 deletions.
66 changes: 46 additions & 20 deletions datatransfer/impl/graphsync/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"context"
"fmt"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"math/rand"
"reflect"

Expand All @@ -14,6 +13,7 @@ import (
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/encoding/dagcbor"
ipldfree "github.com/ipld/go-ipld-prime/impl/free"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"

Expand Down Expand Up @@ -56,7 +56,7 @@ type graphsyncImpl struct {
}

type graphsyncReceiver struct {
ctx context.Context
ctx context.Context
impl *graphsyncImpl
}

Expand All @@ -76,10 +76,10 @@ func NewGraphSyncDataTransfer(parent context.Context, host host.Host, gs graphsy
}

// RegisterVoucherType registers a validator for the given voucher type
// will error if voucher type does not implement voucher
// or if there is a voucher type registered with an identical identifier
// This assumes that the voucherType is a pointer type, and that anything that implements datatransfer.Voucher
// Takes a pointer receiver.
// returns error if:
// * voucher type does not implement voucher
// * there is a voucher type registered with an identical identifier
// * voucherType's Kind is not reflect.Ptr
func (impl *graphsyncImpl) RegisterVoucherType(voucherType reflect.Type, validator datatransfer.RequestValidator) error {
if voucherType.Kind() != reflect.Ptr {
return fmt.Errorf("voucherType must be a reflect.Ptr Kind")
Expand Down Expand Up @@ -109,24 +109,28 @@ func (impl *graphsyncImpl) OpenPushDataChannel(ctx context.Context, to peer.ID,
if err != nil {
return datatransfer.ChannelID{}, err
}
chid := impl.createNewChannel(tid, to, baseCid, selector, voucher)
chid := impl.createNewChannel(tid, baseCid, selector, voucher, to, "", to)
return chid, nil
}

// OpenPullDataChannel opens a data transfer that will request data from the sending peer and
// transfer parts of the piece that match the selector
func (impl *graphsyncImpl) OpenPullDataChannel(ctx context.Context, to peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) (datatransfer.ChannelID, error) {

tid, err := impl.sendRequest(ctx, selector, true, voucher, baseCid, to)
if err != nil {
return datatransfer.ChannelID{}, err
}
chid := impl.createNewChannel(tid, to, baseCid, selector, voucher)
chid := impl.createNewChannel(tid, baseCid, selector, voucher, to, to, "")
return chid, nil
}

// createNewChannel creates a new channel id
func (impl *graphsyncImpl) createNewChannel(tid datatransfer.TransferID, to peer.ID, baseCid cid.Cid, selector ipld.Node, voucher datatransfer.Voucher) datatransfer.ChannelID {
return datatransfer.ChannelID{To: to, ID: tid}
// createNewChannel creates a new channel id and channel state and saves to channels
func (impl *graphsyncImpl) createNewChannel(tid datatransfer.TransferID, baseCid cid.Cid, selector ipld.Node, voucher datatransfer.Voucher, to, sender, receiver peer.ID) datatransfer.ChannelID {
chid := datatransfer.ChannelID{To: to, ID: tid}
chst := datatransfer.ChannelState{Channel: datatransfer.NewChannel(0, baseCid, selector, voucher, sender, receiver, 0)}
impl.channels[chid] = chst
return chid
}

// sendRequest encapsulates message creation and posting to the data transfer network with the provided parameters
Expand All @@ -148,9 +152,9 @@ func (impl *graphsyncImpl) sendRequest(ctx context.Context, selector ipld.Node,
return tid, nil
}

func (impl *graphsyncImpl) sendResponse(ctx context.Context, isAccepted bool, to peer.ID, tid datatransfer.TransferID){
func (impl *graphsyncImpl) sendResponse(ctx context.Context, isAccepted bool, to peer.ID, tid datatransfer.TransferID) {
resp := message.NewResponse(tid, isAccepted)
if err := impl.dataTransferNetwork.SendMessage(ctx, to, resp); err != nil {
if err := impl.dataTransferNetwork.SendMessage(ctx, to, resp); err != nil {
log.Error(err)
}
}
Expand Down Expand Up @@ -186,7 +190,7 @@ func (impl *graphsyncImpl) unsubscribeAt(sub datatransfer.Subscriber) datatransf
}

func (impl *graphsyncImpl) notifySubscribers(evt datatransfer.Event, cs datatransfer.ChannelState) {
for _,cb := range impl.subscribers {
for _, cb := range impl.subscribers {
cb(evt, cs)
}
}
Expand Down Expand Up @@ -273,17 +277,39 @@ func (receiver *graphsyncReceiver) voucherFromRequest(incoming message.DataTrans
return voucher, nil
}

// ReceiveResponse handles responding to Push or Pull Requests.
// It schedules a graphsync transfer only if a Pull Request is accepted.
func (receiver *graphsyncReceiver) ReceiveResponse(
ctx context.Context,
sender peer.ID,
incoming message.DataTransferResponse) {
var evt datatransfer.Event
if !incoming.Accepted() {
evt = datatransfer.Error
} else {
evt = datatransfer.Progress // for now
evt := datatransfer.Error
chst := datatransfer.EmptyChannelState
if incoming.Accepted() {
chid := datatransfer.ChannelID{
To: sender,
ID: incoming.TransferID(),
}
if chst = receiver.impl.getPullChannel(chid); chst != datatransfer.EmptyChannelState {
baseCid := chst.BaseCID()
root := cidlink.Link{baseCid}
receiver.impl.gs.Request(ctx, sender, root, chst.Selector())
evt = datatransfer.Progress
}
receiver.impl.notifySubscribers(evt, datatransfer.ChannelState{})
}
receiver.impl.notifySubscribers(evt, chst)
}

// getPullChannel searches for a pull-type channel in the slice of channels with id `chid`.
// Returns datatransfer.EmptyChannelState if:
// * there is no channel with that id
// * it is not related to a pull request
func (impl *graphsyncImpl) getPullChannel(chid datatransfer.ChannelID) datatransfer.ChannelState {
channelState, ok := impl.channels[chid]
if !ok || channelState.Sender() == "" {
return datatransfer.EmptyChannelState
}
return channelState
}

func (receiver *graphsyncReceiver) ReceiveError(error) {}
Expand Down
211 changes: 132 additions & 79 deletions datatransfer/impl/graphsync/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,86 +773,138 @@ func TestDataTransferInitiatingPushGraphsyncRequests(t *testing.T) {
})
}

// TODO: get passing to complete https://github.com/filecoin-project/go-data-transfer/issues/21
func TestDataTransferInitiatingPullGraphsyncRequests(t *testing.T) {
//ctx := context.Background()
//ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
//defer cancel()
//gsData := newGraphsyncTestingData(t, ctx)
//host1 := gsData.host1
//host2 := gsData.host2
//
//gs2 := &fakeGraphSync{
// receivedRequests: make(chan receivedGraphSyncRequest, 1),
//}
//voucher := fakeDTType{"applesauce"}
//baseCid := testutil.GenerateCids(1)[0]
//
//gs1 := &fakeGraphSync{
// receivedRequests: make(chan receivedGraphSyncRequest, 1),
//}
//dt1 := NewGraphSyncDataTransfer(ctx, host1, gs1)
//
//t.Run("with successful validation", func(t *testing.T) {
// sv := newSV()
// sv.expectSuccessPull()
//
// dt2 := NewGraphSyncDataTransfer(ctx, host2, gs2)
// err := dt2.RegisterVoucherType(reflect.TypeOf(&fakeDTType{}), sv)
// require.NoError(t, err)
//
// _, err = dt1.OpenPullDataChannel(ctx, host2.ID(), &voucher, baseCid, gsData.allSelector)
// require.NoError(t, err)
//
// var requestReceived receivedGraphSyncRequest
// select {
// case <-ctx.Done():
// t.Fatal("did not receive message sent")
// case requestReceived = <-gs1.receivedRequests:
// }
//
// sv.verifyExpectations(t)
//
// receiver := requestReceived.p
// require.Equal(t, receiver, host2.ID())
//
// cl, ok := requestReceived.root.(cidlink.Link)
// require.True(t, ok)
// require.Equal(t, baseCid, cl.Cid)
//
// require.Equal(t, gsData.allSelector, requestReceived.selector)
//})
//
//t.Run("with error validation", func(t *testing.T) {
// sv := newSV()
// sv.expectErrorPull()
//
// dt2 := NewGraphSyncDataTransfer(ctx, host2, gs2)
// err := dt2.RegisterVoucherType(reflect.TypeOf(&fakeDTType{}), sv)
// require.NoError(t, err)
//
// subscribeCalls := make(chan struct{}, 1)
// subscribe := func(event datatransfer.Event, channelState datatransfer.ChannelState) {
// if event == datatransfer.Error {
// subscribeCalls <- struct{}{}
// }
// }
// unsub := dt1.SubscribeToEvents(subscribe)
// _, err = dt1.OpenPullDataChannel(ctx, host2.ID(), &voucher, baseCid, gsData.allSelector)
// require.NoError(t, err)
//
// select {
// case <-ctx.Done():
// t.Fatal("subscribed events not received")
// case <-subscribeCalls:
// }
//
// sv.verifyExpectations(t)
//
// // no graphsync request should be scheduled
// require.Empty(t, gs1.receivedRequests)
// unsub()
//})
ctx := context.Background()
gsData := newGraphsyncTestingData(t, ctx)
host1 := gsData.host1
host2 := gsData.host2

voucher := fakeDTType{"applesauce"}
baseCid := testutil.GenerateCids(1)[0]

t.Run("with successful validation", func(t *testing.T) {
gs1 := &fakeGraphSync{
receivedRequests: make(chan receivedGraphSyncRequest, 1),
}
gs2 := &fakeGraphSync{
receivedRequests: make(chan receivedGraphSyncRequest, 1),
}

sv := newSV()
sv.expectSuccessPull()

bg := ctx
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

dt1 := NewGraphSyncDataTransfer(bg, host1, gs1)
dt2 := NewGraphSyncDataTransfer(bg, host2, gs2)
err := dt2.RegisterVoucherType(reflect.TypeOf(&fakeDTType{}), sv)
require.NoError(t, err)

_, err = dt1.OpenPullDataChannel(ctx, host2.ID(), &voucher, baseCid, gsData.allSelector)
require.NoError(t, err)

var requestReceived receivedGraphSyncRequest
select {
case <-ctx.Done():
t.Fatal("did not receive message sent")
case requestReceived = <-gs1.receivedRequests:
}
sv.verifyExpectations(t)

receiver := requestReceived.p
require.Equal(t, receiver, host2.ID())

cl, ok := requestReceived.root.(cidlink.Link)
require.True(t, ok)
require.Equal(t, baseCid, cl.Cid)

require.Equal(t, gsData.allSelector, requestReceived.selector)
})

t.Run("with error validation", func(t *testing.T) {
gs1 := &fakeGraphSync{
receivedRequests: make(chan receivedGraphSyncRequest, 1),
}
gs2 := &fakeGraphSync{
receivedRequests: make(chan receivedGraphSyncRequest, 1),
}

dt1 := NewGraphSyncDataTransfer(ctx, host1, gs1)
sv := newSV()
sv.expectErrorPull()

dt2 := NewGraphSyncDataTransfer(ctx, host2, gs2)
err := dt2.RegisterVoucherType(reflect.TypeOf(&fakeDTType{}), sv)
require.NoError(t, err)

subscribeCalls := make(chan struct{}, 1)
subscribe := func(event datatransfer.Event, channelState datatransfer.ChannelState) {
if event == datatransfer.Error {
subscribeCalls <- struct{}{}
}
}
unsub := dt1.SubscribeToEvents(subscribe)
_, err = dt1.OpenPullDataChannel(ctx, host2.ID(), &voucher, baseCid, gsData.allSelector)
require.NoError(t, err)

select {
case <-ctx.Done():
t.Fatal("subscribed events not received")
case <-subscribeCalls:
}

// give a little time for the validation to happen
//time.Sleep(15*time.Millisecond)
sv.verifyExpectations(t)

// no graphsync request should be scheduled
require.Empty(t, gs1.receivedRequests)
unsub()
})

t.Run("does not schedule graphsync request if is push request", func(t *testing.T) {
gs1 := &fakeGraphSync{
receivedRequests: make(chan receivedGraphSyncRequest, 1),
}
gs2 := &fakeGraphSync{
receivedRequests: make(chan receivedGraphSyncRequest, 1),
}

sv := newSV()
sv.expectSuccessPush()

bg := ctx
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

dt1 := NewGraphSyncDataTransfer(bg, host1, gs1)
dt2 := NewGraphSyncDataTransfer(bg, host2, gs2)
err := dt2.RegisterVoucherType(reflect.TypeOf(&fakeDTType{}), sv)
require.NoError(t, err)

subscribeCalls := make(chan struct{}, 1)
subscribe := func(event datatransfer.Event, channelState datatransfer.ChannelState) {
if event == datatransfer.Error {
subscribeCalls <- struct{}{}
}
}
unsub := dt1.SubscribeToEvents(subscribe)
_, err = dt1.OpenPushDataChannel(ctx, host2.ID(), &voucher, baseCid, gsData.allSelector)
require.NoError(t, err)

select {
case <-ctx.Done():
t.Fatal("subscribed events not received")
case <-subscribeCalls:
}
sv.verifyExpectations(t)

// no graphsync request should be scheduled
require.Empty(t, gs1.receivedRequests)
unsub()
})
}

type receivedGraphSyncMessage struct {
Expand Down Expand Up @@ -1351,6 +1403,7 @@ type fakeGraphSync struct {

// Request initiates a new GraphSync request to the given peer using the given selector spec.
func (fgs *fakeGraphSync) Request(ctx context.Context, p peer.ID, root ipld.Link, selector ipld.Node, extensions ...graphsync.ExtensionData) (<-chan graphsync.ResponseProgress, <-chan error) {

fgs.receivedRequests <- receivedGraphSyncRequest{p, root, selector, extensions}
responses := make(chan graphsync.ResponseProgress)
errors := make(chan error)
Expand Down
2 changes: 2 additions & 0 deletions datatransfer/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ type ChannelState struct {
received uint64
}

var EmptyChannelState = ChannelState{}

// Sent returns the number of bytes sent
func (c ChannelState) Sent() uint64 { return c.sent }

Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,7 @@ google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRn
google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1 h1:Hz2g2wirWK7H0qIIhGIqRGTuMwTE8HEKFnDZZ7lm9NU=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down

0 comments on commit 977ceee

Please sign in to comment.