Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/find providers #43

Merged
merged 4 commits into from
Jan 15, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions pieceio/cario/cario.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package cario
import (
"context"
"fmt"
"github.com/filecoin-project/go-fil-markets/pieceio"
"io"

"github.com/ipfs/go-car"
"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/traversal/selector"
"io"

"github.com/filecoin-project/go-fil-markets/pieceio"
)

type carIO struct {
Expand Down
5 changes: 3 additions & 2 deletions retrievalmarket/discovery/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ func (l *Local) AddPeer(cid cid.Cid, peer retrievalmarket.RetrievalPeer) error {
return l.ds.Put(dshelp.CidToDsKey(cid), entry)
}

func (l *Local) GetPeers(data cid.Cid) ([]retrievalmarket.RetrievalPeer, error) {
entry, err := l.ds.Get(dshelp.CidToDsKey(data))
func (l *Local) GetPeers(pieceCID []byte) ([]retrievalmarket.RetrievalPeer, error) {
key := string(pieceCID[:])
entry, err := l.ds.Get(datastore.NewKey(key))
if err == datastore.ErrNotFound {
return []retrievalmarket.RetrievalPeer{}, nil
}
Expand Down
36 changes: 23 additions & 13 deletions retrievalmarket/impl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,19 @@ package retrievalimpl

import (
"context"
"errors"
"reflect"
"sync"

"github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/clientstates"

"github.com/filecoin-project/go-address"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/peer"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/clientstates"
rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network"
"github.com/filecoin-project/go-fil-markets/shared/tokenamount"
)
Expand All @@ -28,19 +27,25 @@ type client struct {
node retrievalmarket.RetrievalClientNode
// The parameters should be replaced by RetrievalClientNode

nextDealLk sync.RWMutex
nextDealID retrievalmarket.DealID
nextDealLk sync.RWMutex
nextDealID retrievalmarket.DealID

subscribersLk sync.RWMutex
subscribers []retrievalmarket.ClientSubscriber
subscribers []retrievalmarket.ClientSubscriber
resolver retrievalmarket.PeerResolver
}

// NewClient creates a new retrieval client
func NewClient(network rmnet.RetrievalMarketNetwork, bs blockstore.Blockstore, node retrievalmarket.RetrievalClientNode) retrievalmarket.RetrievalClient {
func NewClient(
network rmnet.RetrievalMarketNetwork,
bs blockstore.Blockstore,
node retrievalmarket.RetrievalClientNode,
resolver retrievalmarket.PeerResolver) retrievalmarket.RetrievalClient {
return &client{
network: network,
bs: bs,
node: node,
network: network,
bs: bs,
node: node,
resolver: resolver,
}
}

Expand All @@ -49,7 +54,12 @@ func NewClient(network rmnet.RetrievalMarketNetwork, bs blockstore.Blockstore, n
// TODO: Implement for retrieval provider V0 epic
// https://github.com/filecoin-project/go-retrieval-market-project/issues/12
func (c *client) FindProviders(pieceCID []byte) []retrievalmarket.RetrievalPeer {
panic("not implemented")
peers, err := c.resolver.GetPeers(pieceCID)
if err != nil {
log.Error(err)
return []retrievalmarket.RetrievalPeer{}
}
return peers
}

// TODO: Update to match spec for V0 epic
Expand Down Expand Up @@ -138,7 +148,7 @@ func (c *client) handleDeal(ctx context.Context, dealState retrievalmarket.Clien
case retrievalmarket.DealStatusFundsNeeded, retrievalmarket.DealStatusFundsNeededLastPayment:
handler = clientstates.ProcessNextResponse
default:
c.failDeal(&dealState, errors.New("unexpected deal state"))
c.failDeal(&dealState, xerrors.New("unexpected deal state"))
return
}
dealModifier := handler(ctx, environment, dealState)
Expand Down
80 changes: 76 additions & 4 deletions retrievalmarket/impl/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package retrievalimpl_test

import (
"context"
"errors"
"testing"

"github.com/filecoin-project/go-address"
Expand All @@ -18,6 +19,7 @@ import (
"github.com/filecoin-project/go-fil-markets/retrievalmarket/network"
rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network"
"github.com/filecoin-project/go-fil-markets/shared/tokenamount"
"github.com/filecoin-project/go-fil-markets/shared/types"
tut "github.com/filecoin-project/go-fil-markets/shared_testutil"
)

Expand Down Expand Up @@ -56,7 +58,7 @@ func TestClient_Query(t *testing.T) {
net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{
QueryStreamBuilder: tut.ExpectPeerOnQueryStreamBuilder(t, expectedPeer, qsb, "Peers should match"),
})
c := retrievalimpl.NewClient(net, bs, testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}))
c := retrievalimpl.NewClient(net, bs, testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}), &testPeerResolver{})

resp, err := c.Query(ctx, rpeer, pcid, retrievalmarket.QueryParams{})
require.NoError(t, err)
Expand All @@ -68,7 +70,8 @@ func TestClient_Query(t *testing.T) {
net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{
QueryStreamBuilder: tut.FailNewQueryStream,
})
c := retrievalimpl.NewClient(net, bs, testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}))
c := retrievalimpl.NewClient(net, bs,
testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}), &testPeerResolver{})

_, err := c.Query(ctx, rpeer, pcid, retrievalmarket.QueryParams{})
assert.EqualError(t, err, "new query stream failed")
Expand All @@ -87,7 +90,8 @@ func TestClient_Query(t *testing.T) {
net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{
QueryStreamBuilder: qsbuilder,
})
c := retrievalimpl.NewClient(net, bs, testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}))
c := retrievalimpl.NewClient(net, bs,
testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}), &testPeerResolver{})

statusCode, err := c.Query(ctx, rpeer, pcid, retrievalmarket.QueryParams{})
assert.EqualError(t, err, "write query failed")
Expand All @@ -105,10 +109,78 @@ func TestClient_Query(t *testing.T) {
net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{
QueryStreamBuilder: qsbuilder,
})
c := retrievalimpl.NewClient(net, bs, testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}))
c := retrievalimpl.NewClient(
net,
bs,
testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}),
&testPeerResolver{})

statusCode, err := c.Query(ctx, rpeer, pcid, retrievalmarket.QueryParams{})
assert.EqualError(t, err, "query response failed")
assert.Equal(t, retrievalmarket.QueryResponseUndefined, statusCode)
})
}

func TestClient_FindProviders(t *testing.T) {
bs := bstore.NewBlockstore(dss.MutexWrap(datastore.NewMapDatastore()))
expectedPeer := peer.ID("somevalue")

var qsb tut.QueryStreamBuilder = func(p peer.ID) (rmnet.RetrievalQueryStream, error) {
return tut.NewTestRetrievalQueryStream(tut.TestQueryStreamParams{
Writer: tut.TrivialQueryWriter,
RespReader: tut.TrivialQueryResponseReader,
}), nil
}
net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{
QueryStreamBuilder: tut.ExpectPeerOnQueryStreamBuilder(t, expectedPeer, qsb, "Peers should match"),
})

t.Run("when providers are found, returns providers", func(t *testing.T) {
peers := tut.RequireGenerateRetrievalPeers(t, 3)
testResolver := testPeerResolver{peers: peers}

c := retrievalimpl.NewClient(net, bs, &testRetrievalNode{}, &testResolver)
testCid := []byte("somePieceCID")
assert.Len(t, c.FindProviders(testCid), 3)
})

t.Run("when there is an error, returns empty provider list", func(t *testing.T) {
testResolver := testPeerResolver{peers: []retrievalmarket.RetrievalPeer{}, resolverError: errors.New("boom")}
c := retrievalimpl.NewClient(net, bs, &testRetrievalNode{}, &testResolver)
badCid := []byte("doesn't matter")
assert.Len(t, c.FindProviders(badCid), 0)
})

t.Run("when there are no providers", func(t *testing.T) {
testResolver := testPeerResolver{peers: []retrievalmarket.RetrievalPeer{}}
c := retrievalimpl.NewClient(net, bs, &testRetrievalNode{}, &testResolver)
testCid := []byte("unimportant")
assert.Len(t, c.FindProviders(testCid), 0)
})
}

type testRetrievalNode struct {
shannonwells marked this conversation as resolved.
Show resolved Hide resolved
}

func (t *testRetrievalNode) GetOrCreatePaymentChannel(ctx context.Context, clientAddress address.Address, minerAddress address.Address, clientFundsAvailable tokenamount.TokenAmount) (address.Address, error) {
return address.Address{}, nil
}

func (t *testRetrievalNode) AllocateLane(paymentChannel address.Address) (uint64, error) {
return 0, nil
}

func (t *testRetrievalNode) CreatePaymentVoucher(ctx context.Context, paymentChannel address.Address, amount tokenamount.TokenAmount, lane uint64) (*types.SignedVoucher, error) {
return nil, nil
}

type testPeerResolver struct {
peers []retrievalmarket.RetrievalPeer
resolverError error
}

var _ retrievalmarket.PeerResolver = &testPeerResolver{}

func (tpr testPeerResolver) GetPeers( []byte) ([]retrievalmarket.RetrievalPeer, error) {
return tpr.peers, tpr.resolverError
}
2 changes: 1 addition & 1 deletion retrievalmarket/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ type RetrievalProviderNode interface {

// PeerResolver is an interface for looking up providers that may have a piece
type PeerResolver interface {
GetPeers(data cid.Cid) ([]RetrievalPeer, error) // TODO: channel
GetPeers(pieceCID []byte) ([]RetrievalPeer, error) // TODO: channel
}

// RetrievalPeer is a provider address/peer.ID pair (everything needed to make
Expand Down
18 changes: 18 additions & 0 deletions shared_testutil/test_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ package shared_testutil
import (
"math/big"
"math/rand"
"testing"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-data-transfer/testutil"
"github.com/libp2p/go-libp2p-core/test"
"github.com/stretchr/testify/require"

"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/shared/tokenamount"
Expand Down Expand Up @@ -107,3 +110,18 @@ func MakeTestDealPayment() retrievalmarket.DealPayment {
PaymentVoucher: MakeTestSignedVoucher(),
}
}

func RequireGenerateRetrievalPeers(t *testing.T, numPeers int) []retrievalmarket.RetrievalPeer {
peers := make([]retrievalmarket.RetrievalPeer, numPeers)
for i := range peers {
pid, err := test.RandPeerID()
require.NoError(t, err)
addr, err := address.NewIDAddress(rand.Uint64())
require.NoError(t, err)
peers[i] = retrievalmarket.RetrievalPeer{
Address: addr,
ID: pid,
}
}
return peers
}