Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make new stream calls accept a context #8

Merged
merged 1 commit into from
Nov 20, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions p2p/host/basic/basic_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ func (h *BasicHost) RemoveStreamHandler(pid protocol.ID) {
// header with given protocol.ID. If there is no connection to p, attempts
// to create one. If ProtocolID is "", writes no header.
// (Threadsafe)
func (h *BasicHost) NewStream(pid protocol.ID, p peer.ID) (inet.Stream, error) {
s, err := h.Network().NewStream(p)
func (h *BasicHost) NewStream(ctx context.Context, pid protocol.ID, p peer.ID) (inet.Stream, error) {
s, err := h.Network().NewStream(ctx, p)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion p2p/host/basic/basic_host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestHostSimple(t *testing.T) {
io.Copy(w, s) // mirror everything
})

s, err := h1.NewStream(protocol.TestingID, h2pi.ID)
s, err := h1.NewStream(ctx, protocol.TestingID, h2pi.ID)
if err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion p2p/host/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type Host interface {
// header with given protocol.ID. If there is no connection to p, attempts
// to create one. If ProtocolID is "", writes no header.
// (Threadsafe)
NewStream(pid protocol.ID, p peer.ID) (inet.Stream, error)
NewStream(ctx context.Context, pid protocol.ID, p peer.ID) (inet.Stream, error)

// Close shuts down the host, its Network, and services.
Close() error
Expand Down
4 changes: 2 additions & 2 deletions p2p/host/routed/routed.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ func (rh *RoutedHost) RemoveStreamHandler(pid protocol.ID) {
rh.host.RemoveStreamHandler(pid)
}

func (rh *RoutedHost) NewStream(pid protocol.ID, p peer.ID) (inet.Stream, error) {
return rh.host.NewStream(pid, p)
func (rh *RoutedHost) NewStream(ctx context.Context, pid protocol.ID, p peer.ID) (inet.Stream, error) {
return rh.host.NewStream(ctx, pid, p)
}
func (rh *RoutedHost) Close() error {
// no need to close IpfsRouting. we dont own it.
Expand Down
2 changes: 1 addition & 1 deletion p2p/net/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type Network interface {

// NewStream returns a new stream to given peer p.
// If there is no connection to p, attempts to create one.
NewStream(peer.ID) (Stream, error)
NewStream(context.Context, peer.ID) (Stream, error)

// Listen tells the network to start listening on given multiaddrs.
Listen(...ma.Multiaddr) error
Expand Down
2 changes: 1 addition & 1 deletion p2p/net/mock/mock_peernet.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func (pn *peernet) Connectedness(p peer.ID) inet.Connectedness {

// NewStream returns a new stream to given peer p.
// If there is no connection to p, attempts to create one.
func (pn *peernet) NewStream(p peer.ID) (inet.Stream, error) {
func (pn *peernet) NewStream(ctx context.Context, p peer.ID) (inet.Stream, error) {
pn.Lock()
cs, found := pn.connsByPeer[p]
if !found || len(cs) < 1 {
Expand Down
28 changes: 16 additions & 12 deletions p2p/net/mock/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
protocol "github.com/ipfs/go-libp2p/p2p/protocol"
testutil "github.com/ipfs/go-libp2p/testutil"

context "gx/QmacZi9WygGK7Me8mH53pypyscHzU386aUZXpr28GZgUct/context"
detectrace "github.com/jbenet/go-detect-race"
context "gx/QmacZi9WygGK7Me8mH53pypyscHzU386aUZXpr28GZgUct/context"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why cant this be ipfs/? or gx/ipfs/ ? (preserving paths is important. adding ipns support and other things will be possible.

please please avoid breaking protocol naming. this is the kinda crap that makes it hard to nest + reuse protocols.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

packages will never use ipns paths. that breaks the whole point of using hashes for package references. ipns names can be used for repos just fine, as those are expected to be mutable.

if a package ref were tied to an ipns hash, then installing the same package two different times may result in a broken package, as the entry may update in between installs. I can switch it to just ipfs/ instead of gx/, david and i thought that gx would be a nice prefix for branding and for making it easy to figure out why i have random hash directories sitting around different places.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

packages will never use ipns paths. that breaks the whole point of using hashes for package references.

not necessarily. there could be other protocols.

if a package ref were tied to an ipns hash, then installing the same package two different times may result in a broken package, as the entry may update in between installs.

agreed, that's how all of Go works today

I can switch it to just ipfs/ instead of gx/, david and i thought that gx would be a nice prefix for branding and for making it easy to figure out why i have random hash directories sitting around different places.

i dont think it should be gx/hash. other protocols may be needed. i think it should be either gx/ipfs/hash (proper nesting) or just ipfs/hash.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@noffle @diasdavid, thoughts here? i'm amenable to changing things to ipfs/hash but too much testing is less fun.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gx/ipfs/hashcould be good when we start "mounting everything" and just let the OS figure out someone is requesting for a ipfs link and fetches it from ipfs. Having it gx is kind of nice because keeps things organised on the local fs (+it is really good for devs to look and tell first hand "oh, this uses gx to fetch the dependency)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gx/ doesn't encode protocol information re retrieval; @jbenet makes a good point here. However, the import path here doesn't need to encode protocol information. @whyrusleeping, I think we talked about something like a .git directory (so, .gx) that stored each package's local installation FS path (gx/<hash> here). And then the package.json would define that package's protocol path (e.g. /ipns/<hash>). My only qualm with adding another layer of nesting is that diminishes the user experience (try using command line tools in Java projects -- ha).

gx/[ipfs|ipns]/: encodes the protocol underneath the gx/ prefix. Nice that it matches up with what's in package.json, but not necessary (as per above). I like the 1:1 matching of FS path and location here.

I don't really like ipfs/hash/, since drops the gx substring. Having it might be nice for identifying what this folder means when looking at a project using gx.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't need to encode protocol information

this breaks encapsulation and proper layering. it doesnt need another
protocol today, but it makes it way harder for us to evolve gx tomorrow --
supposing we had some other prefix (either someone else's protocol, or even
an ipfs protocol that made sense). embedding the assumption that gx/ always
means
ipfs/ is a source of issues. either allow full paths, or gx-nested
full paths, but not gx-nested partial paths.

On Fri, Nov 20, 2015 at 3:39 PM Stephen Whitmore [email protected]
wrote:

In p2p/net/mock/mock_test.go
#8 (comment):

detectrace "github.com/jbenet/go-detect-race"
  • context "gx/QmacZi9WygGK7Me8mH53pypyscHzU386aUZXpr28GZgUct/context"

gx/ doesn't encode protocol information re retrieval; @jbenet
https://github.com/jbenet makes a good point here. However, the import
path here doesn't need to encode protocol information. @whyrusleeping
https://github.com/whyrusleeping, I think we talked about something
like a .git directory (so, .gx) that stored each package's local
installation FS path (gx/ here). And then the package.json would
define that package's protocol path (e.g. /ipns/). My only qualm
with adding another layer of nesting is that diminishes the user experience
(try using command line tools in Java projects -- ha).

gx/[ipfs|ipns]/: encodes the protocol underneath the gx/ prefix. Nice
that it matches up with what's in package.json, but not necessary (as per
above). I like the 1:1 matching of FS path and location here.

I don't really like ipfs/hash/, since drops the gx substring. Having it
might be nice for identifying what this folder means when looking at a
project using gx.


Reply to this email directly or view it on GitHub
https://github.com/ipfs/go-libp2p/pull/8/files#r45533043.

)

func randPeer(t *testing.T) peer.ID {
Expand Down Expand Up @@ -208,21 +208,21 @@ func TestNetworkSetup(t *testing.T) {
// p.NetworkConns(n3)

// can create a stream 2->3, 3->2,
if _, err := n2.NewStream(p3); err != nil {
if _, err := n2.NewStream(ctx, p3); err != nil {
t.Error(err)
}
if _, err := n3.NewStream(p2); err != nil {
if _, err := n3.NewStream(ctx, p2); err != nil {
t.Error(err)
}

// but not 1->2 nor 2->2 (not linked), nor 1->1 (not connected)
if _, err := n1.NewStream(p2); err == nil {
if _, err := n1.NewStream(ctx, p2); err == nil {
t.Error("should not be able to connect")
}
if _, err := n2.NewStream(p2); err == nil {
if _, err := n2.NewStream(ctx, p2); err == nil {
t.Error("should not be able to connect")
}
if _, err := n1.NewStream(p1); err == nil {
if _, err := n1.NewStream(ctx, p1); err == nil {
t.Error("should not be able to connect")
}

Expand All @@ -232,7 +232,7 @@ func TestNetworkSetup(t *testing.T) {
}

// and a stream too
if _, err := n1.NewStream(p1); err != nil {
if _, err := n1.NewStream(ctx, p1); err != nil {
t.Error(err)
}

Expand Down Expand Up @@ -265,13 +265,14 @@ func TestNetworkSetup(t *testing.T) {
}

// and a stream should work now too :)
if _, err := n2.NewStream(p3); err != nil {
if _, err := n2.NewStream(ctx, p3); err != nil {
t.Error(err)
}

}

func TestStreams(t *testing.T) {
ctx := context.Background()

mn, err := FullMeshConnected(context.Background(), 3)
if err != nil {
Expand All @@ -297,7 +298,7 @@ func TestStreams(t *testing.T) {
h.SetStreamHandler(protocol.TestingID, handler)
}

s, err := hosts[0].NewStream(protocol.TestingID, hosts[1].ID())
s, err := hosts[0].NewStream(ctx, protocol.TestingID, hosts[1].ID())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -361,6 +362,7 @@ func makePonger(st string) func(inet.Stream) {
}

func TestStreamsStress(t *testing.T) {
ctx := context.Background()
nnodes := 100
if detectrace.WithRace() {
nnodes = 50
Expand All @@ -384,7 +386,7 @@ func TestStreamsStress(t *testing.T) {
defer wg.Done()
from := rand.Intn(len(hosts))
to := rand.Intn(len(hosts))
s, err := hosts[from].NewStream(protocol.TestingID, hosts[to].ID())
s, err := hosts[from].NewStream(ctx, protocol.TestingID, hosts[to].ID())
if err != nil {
log.Debugf("%d (%s) %d (%s)", from, hosts[from], to, hosts[to])
panic(err)
Expand Down Expand Up @@ -463,7 +465,8 @@ func TestAdding(t *testing.T) {
t.Fatalf("no network for %s", p1)
}

s, err := h1.NewStream(protocol.TestingID, p2)
ctx := context.Background()
s, err := h1.NewStream(ctx, protocol.TestingID, p2)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -559,7 +562,8 @@ func TestLimitedStreams(t *testing.T) {
link.SetOptions(opts)
}

s, err := hosts[0].NewStream(protocol.TestingID, hosts[1].ID())
ctx := context.Background()
s, err := hosts[0].NewStream(ctx, protocol.TestingID, hosts[1].ID())
if err != nil {
t.Fatal(err)
}
Expand Down
5 changes: 3 additions & 2 deletions p2p/net/swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,16 +192,17 @@ func (s *Swarm) SetStreamHandler(handler inet.StreamHandler) {
}

// NewStreamWithPeer creates a new stream on any available connection to p
func (s *Swarm) NewStreamWithPeer(p peer.ID) (*Stream, error) {
func (s *Swarm) NewStreamWithPeer(ctx context.Context, p peer.ID) (*Stream, error) {
// if we have no connections, try connecting.
if len(s.ConnectionsToPeer(p)) == 0 {
log.Debug("Swarm: NewStreamWithPeer no connections. Attempting to connect...")
if _, err := s.Dial(s.Context(), p); err != nil {
if _, err := s.Dial(ctx, p); err != nil {
return nil, err
}
}
log.Debug("Swarm: NewStreamWithPeer...")

// TODO: think about passing a context down to NewStreamWithGroup
st, err := s.swarm.NewStreamWithGroup(p)
return wrapStream(st), err
}
Expand Down
4 changes: 2 additions & 2 deletions p2p/net/swarm/swarm_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,9 @@ func (n *Network) Connectedness(p peer.ID) inet.Connectedness {

// NewStream returns a new stream to given peer p.
// If there is no connection to p, attempts to create one.
func (n *Network) NewStream(p peer.ID) (inet.Stream, error) {
func (n *Network) NewStream(ctx context.Context, p peer.ID) (inet.Stream, error) {
log.Debugf("[%s] network opening stream to peer [%s]", n.local, p)
s, err := n.Swarm().NewStreamWithPeer(p)
s, err := n.Swarm().NewStreamWithPeer(ctx, p)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion p2p/net/swarm/swarm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func SubtestSwarm(t *testing.T, SwarmNum int, MsgNum int) {
defer wg.Done()

// first, one stream per peer (nice)
stream, err := s1.NewStreamWithPeer(p)
stream, err := s1.NewStreamWithPeer(ctx, p)
if err != nil {
errChan <- err
return
Expand Down
2 changes: 1 addition & 1 deletion p2p/protocol/ping/ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (p *PingService) PingHandler(s inet.Stream) {
}

func (ps *PingService) Ping(ctx context.Context, p peer.ID) (<-chan time.Duration, error) {
s, err := ps.Host.NewStream(ID, p)
s, err := ps.Host.NewStream(ctx, ID, p)
if err != nil {
return nil, err
}
Expand Down
13 changes: 10 additions & 3 deletions p2p/protocol/relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ package relay
import (
"fmt"
"io"
"time"

mh "gx/Qma7dqy7ZVH4tkNJdC9TRrA82Uz5fQfbbwuvmNVVc17r7a/go-multihash"

host "github.com/ipfs/go-libp2p/p2p/host"
inet "github.com/ipfs/go-libp2p/p2p/net"
peer "github.com/ipfs/go-libp2p/p2p/peer"
protocol "github.com/ipfs/go-libp2p/p2p/protocol"
context "gx/QmacZi9WygGK7Me8mH53pypyscHzU386aUZXpr28GZgUct/context"
logging "gx/QmfZZB1aVXWA4kaR5R4e9NifERT366TTCSagkfhmAbYLsu/go-log"
)

Expand Down Expand Up @@ -83,10 +85,15 @@ func (rs *RelayService) consumeStream(s inet.Stream) error {

// pipeStream relays over a stream to a remote peer. It's like `cat`
func (rs *RelayService) pipeStream(src, dst peer.ID, s inet.Stream) error {
s2, err := rs.openStreamToPeer(dst)
// TODO: find a good way to pass contexts into here
nsctx, cancel := context.WithTimeout(context.TODO(), time.Second*30)
defer cancel()

s2, err := rs.openStreamToPeer(nsctx, dst)
if err != nil {
return fmt.Errorf("failed to open stream to peer: %s -- %s", dst, err)
}
cancel() // cancel here because this function might last a while

if err := WriteHeader(s2, src, dst); err != nil {
return err
Expand Down Expand Up @@ -116,8 +123,8 @@ func (rs *RelayService) pipeStream(src, dst peer.ID, s inet.Stream) error {
// openStreamToPeer opens a pipe to a remote endpoint
// for now, can only open streams to directly connected peers.
// maybe we can do some routing later on.
func (rs *RelayService) openStreamToPeer(p peer.ID) (inet.Stream, error) {
return rs.host.NewStream(ID, p)
func (rs *RelayService) openStreamToPeer(ctx context.Context, p peer.ID) (inet.Stream, error) {
return rs.host.NewStream(ctx, ID, p)
}

func ReadHeader(r io.Reader) (src, dst peer.ID, err error) {
Expand Down
6 changes: 3 additions & 3 deletions p2p/protocol/relay/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestRelaySimple(t *testing.T) {

// ok, now we can try to relay n1--->n2--->n3.
log.Debug("open relay stream")
s, err := n1.NewStream(relay.ID, n2p)
s, err := n1.NewStream(ctx, relay.ID, n2p)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -145,7 +145,7 @@ func TestRelayAcrossFour(t *testing.T) {

// ok, now we can try to relay n1--->n2--->n3--->n4--->n5
log.Debug("open relay stream")
s, err := n1.NewStream(relay.ID, n2p)
s, err := n1.NewStream(ctx, relay.ID, n2p)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -245,7 +245,7 @@ func TestRelayStress(t *testing.T) {

// ok, now we can try to relay n1--->n2--->n3.
log.Debug("open relay stream")
s, err := n1.NewStream(relay.ID, n2p)
s, err := n1.NewStream(ctx, relay.ID, n2p)
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions p2p/test/backpressure/backpressure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ a problem.
}()

for {
s, err = host.NewStream(protocol.TestingID, remote)
s, err = host.NewStream(context.Background(), protocol.TestingID, remote)
if err != nil {
return
}
Expand Down Expand Up @@ -286,7 +286,7 @@ func TestStBackpressureStreamWrite(t *testing.T) {
}

// open a stream, from 2->1, this is our reader
s, err := h2.NewStream(protocol.TestingID, h1.ID())
s, err := h2.NewStream(context.Background(), protocol.TestingID, h1.ID())
if err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion p2p/test/reconnects/reconnect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func SubtestConnSendDisc(t *testing.T, hosts []host.Host) {
for i := 0; i < numStreams; i++ {
h1 := hosts[i%len(hosts)]
h2 := hosts[(i+1)%len(hosts)]
s, err := h1.NewStream(protocol.TestingID, h2.ID())
s, err := h1.NewStream(context.Background(), protocol.TestingID, h2.ID())
if err != nil {
t.Error(err)
}
Expand Down