Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

migrate to consolidated types. #344

Merged
merged 1 commit into from
May 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 16 additions & 15 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@ import (
"sync"
"time"

"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p-core/routing"

"go.opencensus.io/tag"
"golang.org/x/xerrors"

Expand All @@ -22,15 +29,9 @@ import (
logging "github.com/ipfs/go-log"
goprocess "github.com/jbenet/goprocess"
goprocessctx "github.com/jbenet/goprocess/context"
host "github.com/libp2p/go-libp2p-host"
kb "github.com/libp2p/go-libp2p-kbucket"
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
protocol "github.com/libp2p/go-libp2p-protocol"
record "github.com/libp2p/go-libp2p-record"
recpb "github.com/libp2p/go-libp2p-record/pb"
routing "github.com/libp2p/go-libp2p-routing"
base32 "github.com/whyrusleeping/base32"
)

Expand All @@ -41,11 +42,11 @@ var logger = logging.Logger("dht")
const NumBootstrapQueries = 5

// IpfsDHT is an implementation of Kademlia with S/Kademlia modifications.
// It is used to implement the base IpfsRouting module.
// It is used to implement the base Routing module.
type IpfsDHT struct {
host host.Host // the network services we need
self peer.ID // Local peer (yourself)
peerstore pstore.Peerstore // Peer Registry
host host.Host // the network services we need
self peer.ID // Local peer (yourself)
peerstore peerstore.Peerstore // Peer Registry

datastore ds.Datastore // Local data

Expand All @@ -71,7 +72,7 @@ type IpfsDHT struct {
// guarantee, but we can use them to aid refactoring.
var (
_ routing.ContentRouting = (*IpfsDHT)(nil)
_ routing.IpfsRouting = (*IpfsDHT)(nil)
_ routing.Routing = (*IpfsDHT)(nil)
_ routing.PeerRouting = (*IpfsDHT)(nil)
_ routing.PubKeyFetcher = (*IpfsDHT)(nil)
_ routing.ValueStore = (*IpfsDHT)(nil)
Expand Down Expand Up @@ -182,7 +183,7 @@ var errInvalidRecord = errors.New("received invalid record")
// key. It returns either the value or a list of closer peers.
// NOTE: It will update the dht's peerstore with any new addresses
// it finds for the given peer.
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, key string) (*recpb.Record, []*pstore.PeerInfo, error) {
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, key string) (*recpb.Record, []*peer.AddrInfo, error) {

pmes, err := dht.getValueSingle(ctx, p, key)
if err != nil {
Expand Down Expand Up @@ -278,12 +279,12 @@ func (dht *IpfsDHT) Update(ctx context.Context, p peer.ID) {
}

// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
func (dht *IpfsDHT) FindLocal(id peer.ID) pstore.PeerInfo {
func (dht *IpfsDHT) FindLocal(id peer.ID) peer.AddrInfo {
switch dht.host.Network().Connectedness(id) {
case inet.Connected, inet.CanConnect:
case network.Connected, network.CanConnect:
return dht.peerstore.PeerInfo(id)
default:
return pstore.PeerInfo{}
return peer.AddrInfo{}
}
}

Expand Down
10 changes: 5 additions & 5 deletions dht_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import (
"fmt"
"time"

"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/routing"

u "github.com/ipfs/go-ipfs-util"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
routing "github.com/libp2p/go-libp2p-routing"
multiaddr "github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-multiaddr"
_ "github.com/multiformats/go-multiaddr-dns"
)

Expand Down Expand Up @@ -112,7 +112,7 @@ func newRandomPeerId() peer.ID {
}

// Traverse the DHT toward the given ID.
func (dht *IpfsDHT) walk(ctx context.Context, target peer.ID) (pstore.PeerInfo, error) {
func (dht *IpfsDHT) walk(ctx context.Context, target peer.ID) (peer.AddrInfo, error) {
// TODO: Extract the query action (traversal logic?) inside FindPeer,
// don't actually call through the FindPeer machinery, which can return
// things out of the peer store etc.
Expand Down
27 changes: 16 additions & 11 deletions dht_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,15 @@ import (
"sync"
"time"

ggio "github.com/gogo/protobuf/io"
"github.com/libp2p/go-libp2p-core/helpers"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"

"github.com/libp2p/go-libp2p-kad-dht/metrics"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"

ggio "github.com/gogo/protobuf/io"

"go.opencensus.io/stats"
"go.opencensus.io/tag"
)
Expand Down Expand Up @@ -55,8 +59,8 @@ func (w *bufferedDelimitedWriter) Flush() error {
return w.Writer.Flush()
}

// handleNewStream implements the inet.StreamHandler
func (dht *IpfsDHT) handleNewStream(s inet.Stream) {
// handleNewStream implements the network.StreamHandler
func (dht *IpfsDHT) handleNewStream(s network.Stream) {
defer s.Reset()
if dht.handleNewMessage(s) {
// Gracefully close the stream for writes.
Expand All @@ -65,9 +69,10 @@ func (dht *IpfsDHT) handleNewStream(s inet.Stream) {
}

// Returns true on orderly completion of writes (so we can Close the stream).
func (dht *IpfsDHT) handleNewMessage(s inet.Stream) bool {
func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
ctx := dht.ctx
r := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
r := ggio.NewDelimitedReader(s, network.MessageSizeMax)

mPeer := s.Conn().RemotePeer()

timer := time.AfterFunc(dhtStreamIdleTimeout, func() { s.Reset() })
Expand Down Expand Up @@ -242,7 +247,7 @@ func (dht *IpfsDHT) messageSenderForPeer(ctx context.Context, p peer.ID) (*messa
}

type messageSender struct {
s inet.Stream
s network.Stream
r ggio.ReadCloser
lk sync.Mutex
p peer.ID
Expand Down Expand Up @@ -286,7 +291,7 @@ func (ms *messageSender) prep(ctx context.Context) error {
return err
}

ms.r = ggio.NewDelimitedReader(nstr, inet.MessageSizeMax)
ms.r = ggio.NewDelimitedReader(nstr, network.MessageSizeMax)
ms.s = nstr

return nil
Expand Down Expand Up @@ -322,7 +327,7 @@ func (ms *messageSender) SendMessage(ctx context.Context, pmes *pb.Message) erro
logger.Event(ctx, "dhtSentMessage", ms.dht.self, ms.p, pmes)

if ms.singleMes > streamReuseTries {
go inet.FullClose(ms.s)
go helpers.FullClose(ms.s)
ms.s = nil
} else if retry {
ms.singleMes++
Expand Down Expand Up @@ -371,7 +376,7 @@ func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb
logger.Event(ctx, "dhtSentMessage", ms.dht.self, ms.p, pmes)

if ms.singleMes > streamReuseTries {
go inet.FullClose(ms.s)
go helpers.FullClose(ms.s)
ms.s = nil
} else if retry {
ms.singleMes++
Expand Down
29 changes: 15 additions & 14 deletions dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ import (
"testing"
"time"

"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/routing"

multistream "github.com/multiformats/go-multistream"

"golang.org/x/xerrors"
Expand All @@ -25,14 +29,11 @@ import (
cid "github.com/ipfs/go-cid"
u "github.com/ipfs/go-ipfs-util"
kb "github.com/libp2p/go-libp2p-kbucket"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
record "github.com/libp2p/go-libp2p-record"
routing "github.com/libp2p/go-libp2p-routing"
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
ci "github.com/libp2p/go-libp2p-testing/ci"
travisci "github.com/libp2p/go-libp2p-testing/ci/travis"
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
ci "github.com/libp2p/go-testutil/ci"
travisci "github.com/libp2p/go-testutil/ci/travis"
ma "github.com/multiformats/go-multiaddr"
)

Expand Down Expand Up @@ -127,8 +128,8 @@ func connectNoSync(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
t.Fatal("peers setup incorrectly: no local address")
}

a.peerstore.AddAddrs(idB, addrB, pstore.TempAddrTTL)
pi := pstore.PeerInfo{ID: idB}
a.peerstore.AddAddrs(idB, addrB, peerstore.TempAddrTTL)
pi := peer.AddrInfo{ID: idB}
if err := a.host.Connect(ctx, pi); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1012,7 +1013,7 @@ func TestFindPeersConnectedToPeer(t *testing.T) {
}

// shouldFind := []peer.ID{peers[1], peers[3]}
var found []*pstore.PeerInfo
var found []*peer.AddrInfo
for nextp := range pchan {
found = append(found, nextp)
}
Expand Down Expand Up @@ -1056,14 +1057,14 @@ func TestConnectCollision(t *testing.T) {

errs := make(chan error)
go func() {
dhtA.peerstore.AddAddr(peerB, addrB, pstore.TempAddrTTL)
pi := pstore.PeerInfo{ID: peerB}
dhtA.peerstore.AddAddr(peerB, addrB, peerstore.TempAddrTTL)
pi := peer.AddrInfo{ID: peerB}
err := dhtA.host.Connect(ctx, pi)
errs <- err
}()
go func() {
dhtB.peerstore.AddAddr(peerA, addrA, pstore.TempAddrTTL)
pi := pstore.PeerInfo{ID: peerA}
dhtB.peerstore.AddAddr(peerA, addrA, peerstore.TempAddrTTL)
pi := peer.AddrInfo{ID: peerA}
err := dhtB.host.Connect(ctx, pi)
errs <- err
}()
Expand Down Expand Up @@ -1373,7 +1374,7 @@ func TestPing(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ds := setupDHTS(t, ctx, 2)
ds[0].Host().Peerstore().AddAddrs(ds[1].PeerID(), ds[1].Host().Addrs(), pstore.AddressTTL)
ds[0].Host().Peerstore().AddAddrs(ds[1].PeerID(), ds[1].Host().Addrs(), peerstore.AddressTTL)
assert.NoError(t, ds[0].Ping(context.Background(), ds[1].PeerID()))
}

Expand All @@ -1382,7 +1383,7 @@ func TestClientModeAtInit(t *testing.T) {
defer cancel()
pinger := setupDHT(ctx, t, false)
client := setupDHT(ctx, t, true)
pinger.Host().Peerstore().AddAddrs(client.PeerID(), client.Host().Addrs(), pstore.AddressTTL)
pinger.Host().Peerstore().AddAddrs(client.PeerID(), client.Host().Addrs(), peerstore.AddressTTL)
err := pinger.Ping(context.Background(), client.PeerID())
assert.True(t, xerrors.Is(err, multistream.ErrNotSupported))
}
2 changes: 1 addition & 1 deletion dial_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"sync"
"time"

peer "github.com/libp2p/go-libp2p-peer"
"github.com/libp2p/go-libp2p-core/peer"
queue "github.com/libp2p/go-libp2p-peerstore/queue"
)

Expand Down
2 changes: 1 addition & 1 deletion dial_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"testing"
"time"

peer "github.com/libp2p/go-libp2p-peer"
"github.com/libp2p/go-libp2p-core/peer"
queue "github.com/libp2p/go-libp2p-peerstore/queue"
)

Expand Down
Loading