Skip to content

Commit

Permalink
client accepts a peer resolver
Browse files Browse the repository at this point in the history
  • Loading branch information
shannonwells committed Jan 14, 2020
1 parent 8cf2e3b commit 93f7ddf
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 11 deletions.
36 changes: 26 additions & 10 deletions retrievalmarket/impl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ import (

"github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/clientstates"

"github.com/filecoin-project/go-address"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/peer"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network"
"github.com/filecoin-project/go-fil-markets/shared/tokenamount"
Expand All @@ -28,28 +28,44 @@ type client struct {
node retrievalmarket.RetrievalClientNode
// The parameters should be replaced by RetrievalClientNode

nextDealLk sync.RWMutex
nextDealID retrievalmarket.DealID
nextDealLk sync.RWMutex
nextDealID retrievalmarket.DealID

subscribersLk sync.RWMutex
subscribers []retrievalmarket.ClientSubscriber
subscribers []retrievalmarket.ClientSubscriber
resolver retrievalmarket.PeerResolver
}

// NewClient creates a new retrieval client
func NewClient(network rmnet.RetrievalMarketNetwork, bs blockstore.Blockstore, node retrievalmarket.RetrievalClientNode) retrievalmarket.RetrievalClient {
func NewClient(network rmnet.RetrievalMarketNetwork, bs blockstore.Blockstore, node retrievalmarket.RetrievalClientNode, resolver retrievalmarket.PeerResolver) retrievalmarket.RetrievalClient {
return &client{
network: network,
bs: bs,
node: node,
network: network,
bs: bs,
node: node,
resolver: resolver,
}
}

// V0

// TODO: Implement for retrieval provider V0 epic
// https://github.com/filecoin-project/go-retrieval-market-project/issues/12
func (c *client) FindProviders(pieceCID []byte) []retrievalmarket.RetrievalPeer {
panic("not implemented")
func (c *client) FindProviders(pieceCIDBytes []byte) []retrievalmarket.RetrievalPeer {
cidLen, pieceCid, err := cid.CidFromBytes(pieceCIDBytes)
if err != nil {
log.Error(err)
return []retrievalmarket.RetrievalPeer{}
}
if cidLen == 0 {
log.Error(errors.New("zero-length CID"))
return []retrievalmarket.RetrievalPeer{}
}
peers, err := c.resolver.GetPeers(pieceCid)
if err != nil {
log.Error(err)
return []retrievalmarket.RetrievalPeer{}
}
return peers
}

// TODO: Update to match spec for V0 epic
Expand Down
35 changes: 34 additions & 1 deletion retrievalmarket/impl/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"

"github.com/filecoin-project/go-address"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
dss "github.com/ipfs/go-datastore/sync"
bstore "github.com/ipfs/go-ipfs-blockstore"
Expand Down Expand Up @@ -57,6 +58,7 @@ func TestClient_Query(t *testing.T) {
QueryStreamBuilder: tut.ExpectPeerOnQueryStreamBuilder(t, expectedPeer, qsb, "Peers should match"),
})
c := retrievalimpl.NewClient(net, bs, testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}))
c := retrievalimpl.NewClient(net, bs, &testRetrievalNode{}, &testPeerResolver{})

resp, err := c.Query(ctx, rpeer, pcid, retrievalmarket.QueryParams{})
require.NoError(t, err)
Expand All @@ -69,6 +71,7 @@ func TestClient_Query(t *testing.T) {
QueryStreamBuilder: tut.FailNewQueryStream,
})
c := retrievalimpl.NewClient(net, bs, testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}))
c := retrievalimpl.NewClient(net, bs, &testRetrievalNode{}, &testPeerResolver{})

_, err := c.Query(ctx, rpeer, pcid, retrievalmarket.QueryParams{})
assert.EqualError(t, err, "new query stream failed")
Expand All @@ -88,6 +91,7 @@ func TestClient_Query(t *testing.T) {
QueryStreamBuilder: qsbuilder,
})
c := retrievalimpl.NewClient(net, bs, testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}))
c := retrievalimpl.NewClient(net, bs, &testRetrievalNode{}, &testPeerResolver{})

statusCode, err := c.Query(ctx, rpeer, pcid, retrievalmarket.QueryParams{})
assert.EqualError(t, err, "write query failed")
Expand All @@ -105,10 +109,39 @@ func TestClient_Query(t *testing.T) {
net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{
QueryStreamBuilder: qsbuilder,
})
c := retrievalimpl.NewClient(net, bs, testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}))
c := retrievalimpl.NewClient(
net,
bs,
testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}),
&testPeerResolver{})

statusCode, err := c.Query(ctx, rpeer, pcid, retrievalmarket.QueryParams{})
assert.EqualError(t, err, "query response failed")
assert.Equal(t, retrievalmarket.QueryResponseUndefined, statusCode)
})
}

type testRetrievalNode struct {
}

func (t *testRetrievalNode) GetOrCreatePaymentChannel(ctx context.Context, clientAddress address.Address, minerAddress address.Address, clientFundsAvailable tokenamount.TokenAmount) (address.Address, error) {
return address.Address{}, nil
}

func (t *testRetrievalNode) AllocateLane(paymentChannel address.Address) (uint64, error) {
return 0, nil
}

func (t *testRetrievalNode) CreatePaymentVoucher(ctx context.Context, paymentChannel address.Address, amount tokenamount.TokenAmount, lane uint64) (*types.SignedVoucher, error) {
return nil, nil
}

type testPeerResolver struct {
peers []retrievalmarket.RetrievalPeer
}
var _ retrievalmarket.PeerResolver = &testPeerResolver{}

func (t testPeerResolver) GetPeers(data cid.Cid) ([]retrievalmarket.RetrievalPeer, error) {
return t.peers, nil
}

0 comments on commit 93f7ddf

Please sign in to comment.