Skip to content

Commit

Permalink
Techdebt/dt split graphsync impl receiver (#651)
Browse files Browse the repository at this point in the history
* Split up graphsyncImpl and graphsyncReceiver
* rename graphsync to utils
  • Loading branch information
shannonwells authored and hannahhoward committed Dec 6, 2019
1 parent 9070e6c commit f4ffd41
Show file tree
Hide file tree
Showing 5 changed files with 342 additions and 303 deletions.
Original file line number Diff line number Diff line change
@@ -1,18 +1,13 @@
package graphsyncimpl

import (
"bytes"
"context"
"fmt"
"reflect"

"github.com/ipfs/go-cid"
"github.com/ipfs/go-graphsync"
logging "github.com/ipfs/go-log"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/encoding/dagcbor"
ipldfree "github.com/ipld/go-ipld-prime/impl/free"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"

Expand All @@ -21,8 +16,6 @@ import (
"github.com/filecoin-project/lotus/datatransfer/network"
)

var log = logging.Logger("graphsync-impl")

const (
// ExtensionDataTransfer is the identifier for the data transfer extension to graphsync
ExtensionDataTransfer = graphsync.ExtensionName("fil/data-transfer")
Expand Down Expand Up @@ -56,11 +49,6 @@ type graphsyncImpl struct {
lastTID int64
}

type graphsyncReceiver struct {
ctx context.Context
impl *graphsyncImpl
}

// NewGraphSyncDataTransfer initializes a new graphsync based data transfer manager
func NewGraphSyncDataTransfer(parent context.Context, host host.Host, gs graphsync.GraphExchange) datatransfer.Manager {
dataTransferNetwork := network.NewFromLibp2pHost(host)
Expand Down Expand Up @@ -112,9 +100,8 @@ func (impl *graphsyncImpl) OpenPushDataChannel(ctx context.Context, requestTo pe
if err != nil {
return datatransfer.ChannelID{}, err
}
// initiator = me, sender = me, receiver = them
// initiator = us, sender = us, receiver = them
chid := impl.createNewChannel(tid, baseCid, selector, voucher,
//requestTo, impl.peerID, requestTo)
impl.peerID, impl.peerID, requestTo)
return chid, nil
}
Expand All @@ -127,7 +114,7 @@ func (impl *graphsyncImpl) OpenPullDataChannel(ctx context.Context, requestTo pe
if err != nil {
return datatransfer.ChannelID{}, err
}
// initiator = me, sender = them, receiver = me
// initiator = us, sender = them, receiver = us
chid := impl.createNewChannel(tid, baseCid, selector, voucher,
impl.peerID, requestTo, impl.peerID)
return chid, nil
Expand Down Expand Up @@ -208,106 +195,6 @@ func (impl *graphsyncImpl) InProgressChannels() map[datatransfer.ChannelID]datat
return impl.channels
}

// ReceiveRequest takes an incoming request, validates the voucher and processes the message.
func (receiver *graphsyncReceiver) ReceiveRequest(
ctx context.Context,
sender peer.ID,
incoming message.DataTransferRequest) {

// not yet doing anything else with the voucher
_, err := receiver.validateVoucher(sender, incoming)
if err != nil {
receiver.impl.sendResponse(ctx, false, sender, incoming.TransferID())
return
}
stor, _ := nodeFromBytes(incoming.Selector())
root := cidlink.Link{incoming.BaseCid()}
if !incoming.IsPull() {
receiver.impl.gs.Request(ctx, sender, root, stor)
}
receiver.impl.sendResponse(ctx, true, sender, incoming.TransferID())
}

// validateVoucher converts a voucher in an incoming message to its appropriate
// voucher struct, then runs the validator and returns the results.
// returns error if:
// * voucherFromRequest fails
// * deserialization of selector fails
// * validation fails
func (receiver *graphsyncReceiver) validateVoucher(sender peer.ID, incoming message.DataTransferRequest) (datatransfer.Voucher, error) {

vtypStr := incoming.VoucherType()
vouch, err := receiver.voucherFromRequest(incoming)
if err != nil {
return vouch, err
}

var validatorFunc func(peer.ID, datatransfer.Voucher, cid.Cid, ipld.Node) error
if incoming.IsPull() {
validatorFunc = receiver.impl.validatedTypes[vtypStr].validator.ValidatePull
} else {
validatorFunc = receiver.impl.validatedTypes[vtypStr].validator.ValidatePush
}

stor, err := nodeFromBytes(incoming.Selector())
if err != nil {
return vouch, err
}

if err = validatorFunc(sender, vouch, incoming.BaseCid(), stor); err != nil {
return nil, err
}

return vouch, nil
}

// voucherFromRequest takes an incoming request and attempts to create a
// voucher struct from it using the registered validated types. It returns
// a deserialized voucher and any error. It returns error if:
// * the voucher type has no validator registered
// * the voucher cannot be instantiated via reflection
// * request voucher bytes cannot be deserialized via <voucher>.FromBytes()
func (receiver *graphsyncReceiver) voucherFromRequest(incoming message.DataTransferRequest) (datatransfer.Voucher, error) {
vtypStr := incoming.VoucherType()

validatedType, ok := receiver.impl.validatedTypes[vtypStr]
if !ok {
return nil, fmt.Errorf("unregistered voucher type %s", vtypStr)
}
vStructVal := reflect.New(validatedType.voucherType.Elem())
voucher, ok := vStructVal.Interface().(datatransfer.Voucher)
if !ok || reflect.ValueOf(voucher).IsNil() {
return nil, fmt.Errorf("problem instantiating type %s, voucher: %v", vtypStr, voucher)
}
if err := voucher.FromBytes(incoming.Voucher()); err != nil {
return voucher, err
}
return voucher, nil
}

// ReceiveResponse handles responding to Push or Pull Requests.
// It schedules a graphsync transfer only if a Pull Request is accepted.
func (receiver *graphsyncReceiver) ReceiveResponse(
ctx context.Context,
sender peer.ID,
incoming message.DataTransferResponse) {
evt := datatransfer.Error
chst := datatransfer.EmptyChannelState
if incoming.Accepted() {
chid := datatransfer.ChannelID{
Initiator: receiver.impl.peerID,
ID: incoming.TransferID(),
}
if chst = receiver.impl.getPullChannel(chid); chst != datatransfer.EmptyChannelState {
baseCid := chst.BaseCID()
root := cidlink.Link{baseCid}
receiver.impl.gs.Request(ctx, sender, root, chst.Selector())
evt = datatransfer.Progress
}
}
receiver.impl.notifySubscribers(evt, chst)
}

// getPullChannel searches for a pull-type channel in the slice of channels with id `chid`.
// Returns datatransfer.EmptyChannelState if:
// * there is no channel with that id
Expand All @@ -320,24 +207,6 @@ func (impl *graphsyncImpl) getPullChannel(chid datatransfer.ChannelID) datatrans
return channelState
}

func (receiver *graphsyncReceiver) ReceiveError(error) {}

// nodeAsBytes serializes an ipld.Node
func nodeAsBytes(node ipld.Node) ([]byte, error) {
var buffer bytes.Buffer
err := dagcbor.Encoder(node, &buffer)
if err != nil {
return nil, err
}
return buffer.Bytes(), nil
}

// nodeFromBytes deserializes an ipld.Node
func nodeFromBytes(from []byte) (ipld.Node, error) {
reader := bytes.NewReader(from)
return dagcbor.Decoder(ipldfree.NodeBuilder(), reader)
}

// generateTransferID() generates a unique-to-runtime TransferID for use in creating
// ChannelIDs
func (impl *graphsyncImpl) generateTransferID() datatransfer.TransferID {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,176 +407,6 @@ func (sv *stubbedValidator) verifyExpectations(t *testing.T) {
}
}

func TestSendResponseToIncomingRequest(t *testing.T) {
// create network
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

gsData := newGraphsyncTestingData(t, ctx)
host1 := gsData.host1
host2 := gsData.host2

// setup receiving peer to just record message coming in
dtnet1 := network.NewFromLibp2pHost(host1)
r := &receiver{
messageReceived: make(chan receivedMessage),
}
dtnet1.SetDelegate(r)

gs2 := &fakeGraphSync{
receivedRequests: make(chan receivedGraphSyncRequest, 1),
}

voucher := fakeDTType{"applesauce"}
baseCid := testutil.GenerateCids(1)[0]
id := datatransfer.TransferID(rand.Int31())
var buffer bytes.Buffer
err := dagcbor.Encoder(gsData.allSelector, &buffer)
require.NoError(t, err)

t.Run("Response to push with successful validation", func(t *testing.T) {
sv := newSV()
sv.expectSuccessPush()

dt := NewGraphSyncDataTransfer(ctx, host2, gs2)
require.NoError(t, dt.RegisterVoucherType(reflect.TypeOf(&fakeDTType{}), sv))

isPull := false
voucherBytes, err := voucher.ToBytes()
require.NoError(t, err)
_ = message.NewRequest(id, isPull, voucher.Type(), voucherBytes, baseCid, buffer.Bytes())
request := message.NewRequest(id, isPull, voucher.Type(), voucherBytes, baseCid, buffer.Bytes())
require.NoError(t, dtnet1.SendMessage(ctx, host2.ID(), request))
var messageReceived receivedMessage
select {
case <-ctx.Done():
t.Fatal("did not receive message sent")
case messageReceived = <-r.messageReceived:
}

sv.verifyExpectations(t)

sender := messageReceived.sender
require.Equal(t, sender, host2.ID())

received := messageReceived.message
require.False(t, received.IsRequest())
receivedResponse, ok := received.(message.DataTransferResponse)
require.True(t, ok)

assert.Equal(t, receivedResponse.TransferID(), id)
require.True(t, receivedResponse.Accepted())

})

t.Run("Response to push with error validation", func(t *testing.T) {
sv := newSV()
sv.expectErrorPush()
dt := NewGraphSyncDataTransfer(ctx, host2, gs2)
err = dt.RegisterVoucherType(reflect.TypeOf(&fakeDTType{}), sv)
require.NoError(t, err)

isPull := false

voucherBytes, err := voucher.ToBytes()
require.NoError(t, err)
request := message.NewRequest(id, isPull, voucher.Type(), voucherBytes, baseCid, buffer.Bytes())
require.NoError(t, dtnet1.SendMessage(ctx, host2.ID(), request))

var messageReceived receivedMessage
select {
case <-ctx.Done():
t.Fatal("did not receive message sent")
case messageReceived = <-r.messageReceived:
}

sv.verifyExpectations(t)

sender := messageReceived.sender
require.Equal(t, sender, host2.ID())

received := messageReceived.message
require.False(t, received.IsRequest())
receivedResponse, ok := received.(message.DataTransferResponse)
require.True(t, ok)

require.Equal(t, receivedResponse.TransferID(), id)
require.False(t, receivedResponse.Accepted())
})

t.Run("Response to pull with successful validation", func(t *testing.T) {
sv := newSV()
sv.expectSuccessPull()

dt := NewGraphSyncDataTransfer(ctx, host2, gs2)
err = dt.RegisterVoucherType(reflect.TypeOf(&fakeDTType{}), sv)
require.NoError(t, err)

isPull := true

voucherBytes, err := voucher.ToBytes()
require.NoError(t, err)
request := message.NewRequest(id, isPull, voucher.Type(), voucherBytes, baseCid, buffer.Bytes())

require.NoError(t, dtnet1.SendMessage(ctx, host2.ID(), request))
var messageReceived receivedMessage
select {
case <-ctx.Done():
t.Fatal("did not receive message sent")
case messageReceived = <-r.messageReceived:
}

sv.verifyExpectations(t)

sender := messageReceived.sender
require.Equal(t, sender, host2.ID())

received := messageReceived.message
require.False(t, received.IsRequest())
receivedResponse, ok := received.(message.DataTransferResponse)
require.True(t, ok)

require.Equal(t, receivedResponse.TransferID(), id)
require.True(t, receivedResponse.Accepted())
})

t.Run("Response to push with error validation", func(t *testing.T) {
sv := newSV()
sv.expectErrorPull()

dt := NewGraphSyncDataTransfer(ctx, host2, gs2)
err = dt.RegisterVoucherType(reflect.TypeOf(&fakeDTType{}), sv)
require.NoError(t, err)

isPull := true
voucherBytes, err := voucher.ToBytes()
require.NoError(t, err)
request := message.NewRequest(id, isPull, voucher.Type(), voucherBytes, baseCid, buffer.Bytes())
require.NoError(t, dtnet1.SendMessage(ctx, host2.ID(), request))

var messageReceived receivedMessage
select {
case <-ctx.Done():
t.Fatal("did not receive message sent")
case messageReceived = <-r.messageReceived:
}

sv.verifyExpectations(t)

sender := messageReceived.sender
require.Equal(t, sender, host2.ID())

received := messageReceived.message
require.False(t, received.IsRequest())
receivedResponse, ok := received.(message.DataTransferResponse)
require.True(t, ok)

require.Equal(t, receivedResponse.TransferID(), id)
require.False(t, receivedResponse.Accepted())
})
}

func TestGraphsyncImpl_RegisterVoucherType(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
Expand Down
Loading

0 comments on commit f4ffd41

Please sign in to comment.