Skip to content

Commit

Permalink
The new data transfer (#55)
Browse files Browse the repository at this point in the history
* feat(datatransfer): define revalidation interfaces

* feat(message): add voucher results to message

* feat(graphsyncimpl): add send voucher

add send voucher function to data transfer, and create update options for send & receive

* refactor(datatransfer): minor cleanups

add utilties to channel state and extract integration tests

* feat(message): add pause to message

add a pause attribute message

* feat(graphsyncimpl): switch to using messages instead of custom extension

* feat(graphsyncimpl): convert to single roundtrip pull

Setup pull requests to immediately send a GS request

* refactor(datatransfer): refactor out transports

Move all transport specific actions outside of the manage, into a transport layer than abstracts the
underlying transport medium

* feat(channels): add channel status tracking

track channel status, add new events and statuses and move start/stop out of constructor

* feat(receiver): refactor to use transport

refactor receiver to use transport to handle responses

* fix(impl): bug fixes and changes to work for sure with transport protocol

* feat(channels): convert channel to cbor-gen

Conver Channel to cbor-gen type

* feat(channels): refactor to use fsm

Refactor channels to use FSM, so we can track statuses and publish events automatically

* feat(impl): implment pause/resume/cancel

also add some voucher result processing. this includes a major refactor of the testing strategy for
the implementation to be more of a unit test

* feat(datatransfer): complete pause & resume

Complete implementation of data transfer pause and resume

* feat(datatransfer): support pause on first validate

Modify voucher validation to support pauses on first validate

* feat(datatransfer): add pause resume vouchers

add the final step for a retrieval flow -- being able to stop and start via vouchers

* feat(network): add connection tagging

support connection tagging to protect connections from getting dropped

* refactor(impl): split up files a bit more

Split up functions into files so that they group better

* fix(lint): fix lint & mod tidy

* refactor(tests): pull out stubbed validators

* fix(test): switch to table test for idempotene

switch integration test to table test to make sure not to reuse constructs taht are already in use

* fix(receiver): only resume transport when possible

Only resume transport when it's possible for the transport to be resumed

* fix(deps): update graphsync
  • Loading branch information
hannahhoward authored Jul 8, 2020
1 parent 9d6a1ef commit d9d9a15
Show file tree
Hide file tree
Showing 48 changed files with 7,093 additions and 2,736 deletions.
19 changes: 11 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,19 @@ Install the module in your package or app with `go get "github.com/filecoin-proj

import (
gsimpl "github.com/ipfs/go-graphsync/impl"
"github.com/filecoin-project/go-data-transfer/datatransfer"
datatransfer "github.com/filecoin-project/go-data-transfer/impl"
gstransport "github.com/filecoin-project/go-data-transfer/transport/graphsync"
"github.com/libp2p/go-libp2p-core/host"
)

```
1. Provide or create a [libp2p host.Host](https://github.com/libp2p/go-libp2p-examples/tree/master/libp2p-host)
1. Provide or create a [go-graphsync GraphExchange](https://github.com/ipfs/go-graphsync#initializing-a-graphsync-exchange)
1. Create a new instance of GraphsyncDataTransfer
1. You will need a transport protocol. The current default transport is graphsync. [go-graphsync GraphExchange](https://github.com/ipfs/go-graphsync#initializing-a-graphsync-exchange)
1. Create a data transfer by building a transport interface and then initializing a new data transfer instance
```go
func NewGraphsyncDatatransfer(h host.Host, gs graphsync.GraphExchange) {
dt := datatransfer.NewGraphSyncDataTransfer(h, gs)
func NewGraphsyncDataTransfer(h host.Host, gs graphsync.GraphExchange) {
tp := gstransport.NewTransport(h.ID(), gs)
dt := impl.NewDataTransfer(h, tp)
}
```

Expand Down Expand Up @@ -81,7 +83,7 @@ func (v *myVoucher) Type() string {
type myValidator struct {
ctx context.Context
validationsReceived chan receivedValidation
ValidationsReceived chan receivedValidation
}
func (vl *myValidator) ValidatePush(
Expand Down Expand Up @@ -126,15 +128,16 @@ by its `reflect.Type` and `dataTransfer.RequestValidator` for vouchers that
must be sent with the request. Using the trivial examples above:
```go
func NewGraphsyncDatatransfer(h host.Host, gs graphsync.GraphExchange) {
dt := datatransfer.NewGraphSyncDataTransfer(h, gs)
tp := gstransport.NewTransport(h.ID(), gs)
dt := impl.NewDataTransfer(h, tp)
vouch := &myVoucher{}
mv := &myValidator{}
dt.RegisterVoucherType(reflect.TypeOf(vouch), mv)
}
```

For more detail, please see the [unit tests](https://github.com/filecoin-project/go-data-transfer/blob/master/impl/graphsync/graphsync_impl_test.go).
For more detail, please see the [unit tests](https://github.com/filecoin-project/go-data-transfer/blob/master/impl/impl_test.go).

### Open a Push or Pull Request
For a push or pull request, provide a context, a `datatransfer.Voucher`, a host recipient `peer.ID`, a baseCID `cid.CID` and a selector `ipld.Node`. These
Expand Down
139 changes: 139 additions & 0 deletions channels/channel_state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package channels

import (
"bytes"

datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/codec/dagcbor"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
peer "github.com/libp2p/go-libp2p-core/peer"
cbg "github.com/whyrusleeping/cbor-gen"
)

// channelState is immutable channel data plus mutable state
type channelState struct {
// an identifier for this channel shared by request and responder, set by requester through protocol
transferID datatransfer.TransferID
// base CID for the piece being transferred
baseCid cid.Cid
// portion of Piece to return, specified by an IPLD selector
selector *cbg.Deferred
// the party that is sending the data (not who initiated the request)
sender peer.ID
// the party that is receiving the data (not who initiated the request)
recipient peer.ID
// expected amount of data to be transferred
totalSize uint64
// current status of this deal
status datatransfer.Status
// total bytes sent from this node (0 if receiver)
sent uint64
// total bytes received by this node (0 if sender)
received uint64
// more informative status on a channel
message string
// additional vouchers
vouchers []encodedVoucher
// additional voucherResults
voucherResults []encodedVoucherResult
voucherResultDecoder DecoderByTypeFunc
voucherDecoder DecoderByTypeFunc
}

// EmptyChannelState is the zero value for channel state, meaning not present
var EmptyChannelState = channelState{}

// Status is the current status of this channel
func (c channelState) Status() datatransfer.Status { return c.status }

// Sent returns the number of bytes sent
func (c channelState) Sent() uint64 { return c.sent }

// Received returns the number of bytes received
func (c channelState) Received() uint64 { return c.received }

// TransferID returns the transfer id for this channel
func (c channelState) TransferID() datatransfer.TransferID { return c.transferID }

// BaseCID returns the CID that is at the root of this data transfer
func (c channelState) BaseCID() cid.Cid { return c.baseCid }

// Selector returns the IPLD selector for this data transfer (represented as
// an IPLD node)
func (c channelState) Selector() ipld.Node {
builder := basicnode.Style.Any.NewBuilder()
reader := bytes.NewReader(c.selector.Raw)
err := dagcbor.Decoder(builder, reader)
if err != nil {
log.Error(err)
}
return builder.Build()
}

// Voucher returns the voucher for this data transfer
func (c channelState) Voucher() datatransfer.Voucher {
decoder, _ := c.voucherDecoder(c.vouchers[0].Type)
encodable, _ := decoder.DecodeFromCbor(c.vouchers[0].Voucher.Raw)
return encodable.(datatransfer.Voucher)
}

// Sender returns the peer id for the node that is sending data
func (c channelState) Sender() peer.ID { return c.sender }

// Recipient returns the peer id for the node that is receiving data
func (c channelState) Recipient() peer.ID { return c.recipient }

// TotalSize returns the total size for the data being transferred
func (c channelState) TotalSize() uint64 { return c.totalSize }

// IsPull returns whether this is a pull request based on who initiated it
func (c channelState) IsPull(initiator peer.ID) bool {
return initiator == c.recipient
}

func (c channelState) Message() string {
return c.message
}

func (c channelState) Vouchers() []datatransfer.Voucher {
vouchers := make([]datatransfer.Voucher, 0, len(c.vouchers))
for _, encoded := range c.vouchers {
decoder, _ := c.voucherDecoder(encoded.Type)
encodable, _ := decoder.DecodeFromCbor(encoded.Voucher.Raw)
vouchers = append(vouchers, encodable.(datatransfer.Voucher))
}
return vouchers
}

func (c channelState) LastVoucher() datatransfer.Voucher {
decoder, _ := c.voucherDecoder(c.vouchers[len(c.vouchers)-1].Type)
encodable, _ := decoder.DecodeFromCbor(c.vouchers[len(c.vouchers)-1].Voucher.Raw)
return encodable.(datatransfer.Voucher)
}

func (c channelState) LastVoucherResult() datatransfer.VoucherResult {
decoder, _ := c.voucherResultDecoder(c.voucherResults[len(c.voucherResults)-1].Type)
encodable, _ := decoder.DecodeFromCbor(c.voucherResults[len(c.voucherResults)-1].VoucherResult.Raw)
return encodable.(datatransfer.VoucherResult)
}

func (c channelState) VoucherResults() []datatransfer.VoucherResult {
voucherResults := make([]datatransfer.VoucherResult, 0, len(c.voucherResults))
for _, encoded := range c.voucherResults {
decoder, _ := c.voucherResultDecoder(encoded.Type)
encodable, _ := decoder.DecodeFromCbor(encoded.VoucherResult.Raw)
voucherResults = append(voucherResults, encodable.(datatransfer.VoucherResult))
}
return voucherResults
}

func (c channelState) OtherParty(thisParty peer.ID) peer.ID {
if thisParty == c.sender {
return c.recipient
}
return c.sender
}

var _ datatransfer.ChannelState = channelState{}
Loading

0 comments on commit d9d9a15

Please sign in to comment.