Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The new data transfer #55

Merged
merged 24 commits into from
Jul 8, 2020
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
446e299
feat(datatransfer): define revalidation interfaces
hannahhoward Apr 24, 2020
7ff1af2
feat(message): add voucher results to message
hannahhoward Apr 24, 2020
22fcb95
feat(graphsyncimpl): add send voucher
hannahhoward Jun 2, 2020
8e68aab
refactor(datatransfer): minor cleanups
hannahhoward Jun 2, 2020
b09a9a0
feat(message): add pause to message
hannahhoward Jun 10, 2020
2812529
feat(graphsyncimpl): switch to using messages instead of custom exten…
hannahhoward Jun 11, 2020
b770710
feat(graphsyncimpl): convert to single roundtrip pull
hannahhoward Jun 11, 2020
daa896b
refactor(datatransfer): refactor out transports
hannahhoward Jul 1, 2020
39534af
feat(channels): add channel status tracking
hannahhoward Jul 2, 2020
91e2565
feat(receiver): refactor to use transport
hannahhoward Jul 2, 2020
52b5eba
fix(impl): bug fixes and changes to work for sure with transport prot…
hannahhoward Jul 2, 2020
bf19f41
feat(channels): convert channel to cbor-gen
hannahhoward Jul 2, 2020
c24b792
feat(channels): refactor to use fsm
hannahhoward Jul 3, 2020
4ddee61
feat(impl): implment pause/resume/cancel
hannahhoward Jul 4, 2020
900ee34
feat(datatransfer): complete pause & resume
hannahhoward Jul 6, 2020
fa3dd1e
feat(datatransfer): support pause on first validate
hannahhoward Jul 6, 2020
21d6c53
feat(datatransfer): add pause resume vouchers
hannahhoward Jul 7, 2020
b3b3bb6
feat(network): add connection tagging
hannahhoward Jul 7, 2020
77f3f13
refactor(impl): split up files a bit more
hannahhoward Jul 7, 2020
115bdc4
fix(lint): fix lint & mod tidy
hannahhoward Jul 7, 2020
466695d
refactor(tests): pull out stubbed validators
hannahhoward Jul 7, 2020
d73c03f
fix(test): switch to table test for idempotene
hannahhoward Jul 7, 2020
756600a
fix(receiver): only resume transport when possible
hannahhoward Jul 7, 2020
8d924a0
fix(deps): update graphsync
hannahhoward Jul 8, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,19 @@ Install the module in your package or app with `go get "github.com/filecoin-proj

import (
gsimpl "github.com/ipfs/go-graphsync/impl"
"github.com/filecoin-project/go-data-transfer/datatransfer"
datatransfer "github.com/filecoin-project/go-data-transfer/impl"
gstransport "github.com/filecoin-project/go-data-transfer/transport/graphsync"
"github.com/libp2p/go-libp2p-core/host"
)

```
1. Provide or create a [libp2p host.Host](https://github.com/libp2p/go-libp2p-examples/tree/master/libp2p-host)
1. Provide or create a [go-graphsync GraphExchange](https://github.com/ipfs/go-graphsync#initializing-a-graphsync-exchange)
1. Create a new instance of GraphsyncDataTransfer
1. You will need a transport protocol. The current default transport is graphsync. [go-graphsync GraphExchange](https://github.com/ipfs/go-graphsync#initializing-a-graphsync-exchange)
1. Create a data transfer by building a transport interface and then initializing a new data transfer instance
```go
func NewGraphsyncDatatransfer(h host.Host, gs graphsync.GraphExchange) {
dt := datatransfer.NewGraphSyncDataTransfer(h, gs)
func NewGraphsyncDataTransfer(h host.Host, gs graphsync.GraphExchange) {
tp := gstransport.NewTransport(h.ID(), gs)
dt := impl.NewDataTransfer(h, tp)
}
```

Expand Down Expand Up @@ -81,7 +83,7 @@ func (v *myVoucher) Type() string {

type myValidator struct {
ctx context.Context
validationsReceived chan receivedValidation
ValidationsReceived chan receivedValidation
}

func (vl *myValidator) ValidatePush(
Expand Down Expand Up @@ -126,15 +128,16 @@ by its `reflect.Type` and `dataTransfer.RequestValidator` for vouchers that
must be sent with the request. Using the trivial examples above:
```go
func NewGraphsyncDatatransfer(h host.Host, gs graphsync.GraphExchange) {
dt := datatransfer.NewGraphSyncDataTransfer(h, gs)
tp := gstransport.NewTransport(h.ID(), gs)
dt := impl.NewDataTransfer(h, tp)

vouch := &myVoucher{}
mv := &myValidator{}
dt.RegisterVoucherType(reflect.TypeOf(vouch), mv)
}
```

For more detail, please see the [unit tests](https://github.com/filecoin-project/go-data-transfer/blob/master/impl/graphsync/graphsync_impl_test.go).
For more detail, please see the [unit tests](https://github.com/filecoin-project/go-data-transfer/blob/master/impl/impl_test.go).

### Open a Push or Pull Request
For a push or pull request, provide a context, a `datatransfer.Voucher`, a host recipient `peer.ID`, a baseCID `cid.CID` and a selector `ipld.Node`. These
Expand Down
139 changes: 139 additions & 0 deletions channels/channel_state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package channels

import (
"bytes"

datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/codec/dagcbor"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
peer "github.com/libp2p/go-libp2p-core/peer"
cbg "github.com/whyrusleeping/cbor-gen"
)

// channelState is immutable channel data plus mutable state
type channelState struct {
// an identifier for this channel shared by request and responder, set by requester through protocol
transferID datatransfer.TransferID
// base CID for the piece being transferred
baseCid cid.Cid
// portion of Piece to return, specified by an IPLD selector
selector *cbg.Deferred
// the party that is sending the data (not who initiated the request)
sender peer.ID
// the party that is receiving the data (not who initiated the request)
recipient peer.ID
// expected amount of data to be transferred
totalSize uint64
// current status of this deal
status datatransfer.Status
// total bytes sent from this node (0 if receiver)
sent uint64
// total bytes received by this node (0 if sender)
received uint64
// more informative status on a channel
message string
// additional vouchers
vouchers []encodedVoucher
// additional voucherResults
voucherResults []encodedVoucherResult
voucherResultDecoder DecoderByTypeFunc
voucherDecoder DecoderByTypeFunc
}

// EmptyChannelState is the zero value for channel state, meaning not present
var EmptyChannelState = channelState{}

// Status is the current status of this channel
func (c channelState) Status() datatransfer.Status { return c.status }

// Sent returns the number of bytes sent
func (c channelState) Sent() uint64 { return c.sent }

// Received returns the number of bytes received
func (c channelState) Received() uint64 { return c.received }

// TransferID returns the transfer id for this channel
func (c channelState) TransferID() datatransfer.TransferID { return c.transferID }

// BaseCID returns the CID that is at the root of this data transfer
func (c channelState) BaseCID() cid.Cid { return c.baseCid }

// Selector returns the IPLD selector for this data transfer (represented as
// an IPLD node)
func (c channelState) Selector() ipld.Node {
builder := basicnode.Style.Any.NewBuilder()
reader := bytes.NewReader(c.selector.Raw)
err := dagcbor.Decoder(builder, reader)
if err != nil {
log.Error(err)
}
return builder.Build()
}

// Voucher returns the voucher for this data transfer
func (c channelState) Voucher() datatransfer.Voucher {
decoder, _ := c.voucherDecoder(c.vouchers[0].Type)
encodable, _ := decoder.DecodeFromCbor(c.vouchers[0].Voucher.Raw)
return encodable.(datatransfer.Voucher)
}

// Sender returns the peer id for the node that is sending data
func (c channelState) Sender() peer.ID { return c.sender }

// Recipient returns the peer id for the node that is receiving data
func (c channelState) Recipient() peer.ID { return c.recipient }

// TotalSize returns the total size for the data being transferred
func (c channelState) TotalSize() uint64 { return c.totalSize }

// IsPull returns whether this is a pull request based on who initiated it
func (c channelState) IsPull(initiator peer.ID) bool {
return initiator == c.recipient
}

func (c channelState) Message() string {
return c.message
}

func (c channelState) Vouchers() []datatransfer.Voucher {
vouchers := make([]datatransfer.Voucher, 0, len(c.vouchers))
for _, encoded := range c.vouchers {
decoder, _ := c.voucherDecoder(encoded.Type)
encodable, _ := decoder.DecodeFromCbor(encoded.Voucher.Raw)
vouchers = append(vouchers, encodable.(datatransfer.Voucher))
}
return vouchers
}

func (c channelState) LastVoucher() datatransfer.Voucher {
decoder, _ := c.voucherDecoder(c.vouchers[len(c.vouchers)-1].Type)
encodable, _ := decoder.DecodeFromCbor(c.vouchers[len(c.vouchers)-1].Voucher.Raw)
return encodable.(datatransfer.Voucher)
}

func (c channelState) LastVoucherResult() datatransfer.VoucherResult {
decoder, _ := c.voucherResultDecoder(c.voucherResults[len(c.voucherResults)-1].Type)
encodable, _ := decoder.DecodeFromCbor(c.voucherResults[len(c.voucherResults)-1].VoucherResult.Raw)
return encodable.(datatransfer.VoucherResult)
}

func (c channelState) VoucherResults() []datatransfer.VoucherResult {
voucherResults := make([]datatransfer.VoucherResult, 0, len(c.voucherResults))
for _, encoded := range c.voucherResults {
decoder, _ := c.voucherResultDecoder(encoded.Type)
encodable, _ := decoder.DecodeFromCbor(encoded.VoucherResult.Raw)
voucherResults = append(voucherResults, encodable.(datatransfer.VoucherResult))
}
return voucherResults
}

func (c channelState) OtherParty(thisParty peer.ID) peer.ID {
if thisParty == c.sender {
return c.recipient
}
return c.sender
}

var _ datatransfer.ChannelState = channelState{}
Loading