diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index 0f1ec8b3c6..73bbd4d7bc 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -162,8 +162,8 @@ func (h *BasicHost) RemoveStreamHandler(pid protocol.ID) { // header with given protocol.ID. If there is no connection to p, attempts // to create one. If ProtocolID is "", writes no header. // (Threadsafe) -func (h *BasicHost) NewStream(pid protocol.ID, p peer.ID) (inet.Stream, error) { - s, err := h.Network().NewStream(p) +func (h *BasicHost) NewStream(ctx context.Context, pid protocol.ID, p peer.ID) (inet.Stream, error) { + s, err := h.Network().NewStream(ctx, p) if err != nil { return nil, err } diff --git a/p2p/host/basic/basic_host_test.go b/p2p/host/basic/basic_host_test.go index cf30a63325..8a15000116 100644 --- a/p2p/host/basic/basic_host_test.go +++ b/p2p/host/basic/basic_host_test.go @@ -32,7 +32,7 @@ func TestHostSimple(t *testing.T) { io.Copy(w, s) // mirror everything }) - s, err := h1.NewStream(protocol.TestingID, h2pi.ID) + s, err := h1.NewStream(ctx, protocol.TestingID, h2pi.ID) if err != nil { t.Fatal(err) } diff --git a/p2p/host/host.go b/p2p/host/host.go index 9a03b42aff..6c8d25df91 100644 --- a/p2p/host/host.go +++ b/p2p/host/host.go @@ -56,7 +56,7 @@ type Host interface { // header with given protocol.ID. If there is no connection to p, attempts // to create one. If ProtocolID is "", writes no header. // (Threadsafe) - NewStream(pid protocol.ID, p peer.ID) (inet.Stream, error) + NewStream(ctx context.Context, pid protocol.ID, p peer.ID) (inet.Stream, error) // Close shuts down the host, its Network, and services. Close() error diff --git a/p2p/host/routed/routed.go b/p2p/host/routed/routed.go index c8b2f4867c..421f6c03a8 100644 --- a/p2p/host/routed/routed.go +++ b/p2p/host/routed/routed.go @@ -114,8 +114,8 @@ func (rh *RoutedHost) RemoveStreamHandler(pid protocol.ID) { rh.host.RemoveStreamHandler(pid) } -func (rh *RoutedHost) NewStream(pid protocol.ID, p peer.ID) (inet.Stream, error) { - return rh.host.NewStream(pid, p) +func (rh *RoutedHost) NewStream(ctx context.Context, pid protocol.ID, p peer.ID) (inet.Stream, error) { + return rh.host.NewStream(ctx, pid, p) } func (rh *RoutedHost) Close() error { // no need to close IpfsRouting. we dont own it. diff --git a/p2p/net/interface.go b/p2p/net/interface.go index 2cb73bca4f..95a59b9c7f 100644 --- a/p2p/net/interface.go +++ b/p2p/net/interface.go @@ -67,7 +67,7 @@ type Network interface { // NewStream returns a new stream to given peer p. // If there is no connection to p, attempts to create one. - NewStream(peer.ID) (Stream, error) + NewStream(context.Context, peer.ID) (Stream, error) // Listen tells the network to start listening on given multiaddrs. Listen(...ma.Multiaddr) error diff --git a/p2p/net/mock/mock_peernet.go b/p2p/net/mock/mock_peernet.go index d7f21a58c9..d78291fd0a 100644 --- a/p2p/net/mock/mock_peernet.go +++ b/p2p/net/mock/mock_peernet.go @@ -325,7 +325,7 @@ func (pn *peernet) Connectedness(p peer.ID) inet.Connectedness { // NewStream returns a new stream to given peer p. // If there is no connection to p, attempts to create one. -func (pn *peernet) NewStream(p peer.ID) (inet.Stream, error) { +func (pn *peernet) NewStream(ctx context.Context, p peer.ID) (inet.Stream, error) { pn.Lock() cs, found := pn.connsByPeer[p] if !found || len(cs) < 1 { diff --git a/p2p/net/mock/mock_test.go b/p2p/net/mock/mock_test.go index d8ce7b8200..3edec2d247 100644 --- a/p2p/net/mock/mock_test.go +++ b/p2p/net/mock/mock_test.go @@ -14,8 +14,8 @@ import ( protocol "github.com/ipfs/go-libp2p/p2p/protocol" testutil "github.com/ipfs/go-libp2p/testutil" - context "gx/QmacZi9WygGK7Me8mH53pypyscHzU386aUZXpr28GZgUct/context" detectrace "github.com/jbenet/go-detect-race" + context "gx/QmacZi9WygGK7Me8mH53pypyscHzU386aUZXpr28GZgUct/context" ) func randPeer(t *testing.T) peer.ID { @@ -208,21 +208,21 @@ func TestNetworkSetup(t *testing.T) { // p.NetworkConns(n3) // can create a stream 2->3, 3->2, - if _, err := n2.NewStream(p3); err != nil { + if _, err := n2.NewStream(ctx, p3); err != nil { t.Error(err) } - if _, err := n3.NewStream(p2); err != nil { + if _, err := n3.NewStream(ctx, p2); err != nil { t.Error(err) } // but not 1->2 nor 2->2 (not linked), nor 1->1 (not connected) - if _, err := n1.NewStream(p2); err == nil { + if _, err := n1.NewStream(ctx, p2); err == nil { t.Error("should not be able to connect") } - if _, err := n2.NewStream(p2); err == nil { + if _, err := n2.NewStream(ctx, p2); err == nil { t.Error("should not be able to connect") } - if _, err := n1.NewStream(p1); err == nil { + if _, err := n1.NewStream(ctx, p1); err == nil { t.Error("should not be able to connect") } @@ -232,7 +232,7 @@ func TestNetworkSetup(t *testing.T) { } // and a stream too - if _, err := n1.NewStream(p1); err != nil { + if _, err := n1.NewStream(ctx, p1); err != nil { t.Error(err) } @@ -265,13 +265,14 @@ func TestNetworkSetup(t *testing.T) { } // and a stream should work now too :) - if _, err := n2.NewStream(p3); err != nil { + if _, err := n2.NewStream(ctx, p3); err != nil { t.Error(err) } } func TestStreams(t *testing.T) { + ctx := context.Background() mn, err := FullMeshConnected(context.Background(), 3) if err != nil { @@ -297,7 +298,7 @@ func TestStreams(t *testing.T) { h.SetStreamHandler(protocol.TestingID, handler) } - s, err := hosts[0].NewStream(protocol.TestingID, hosts[1].ID()) + s, err := hosts[0].NewStream(ctx, protocol.TestingID, hosts[1].ID()) if err != nil { t.Fatal(err) } @@ -361,6 +362,7 @@ func makePonger(st string) func(inet.Stream) { } func TestStreamsStress(t *testing.T) { + ctx := context.Background() nnodes := 100 if detectrace.WithRace() { nnodes = 50 @@ -384,7 +386,7 @@ func TestStreamsStress(t *testing.T) { defer wg.Done() from := rand.Intn(len(hosts)) to := rand.Intn(len(hosts)) - s, err := hosts[from].NewStream(protocol.TestingID, hosts[to].ID()) + s, err := hosts[from].NewStream(ctx, protocol.TestingID, hosts[to].ID()) if err != nil { log.Debugf("%d (%s) %d (%s)", from, hosts[from], to, hosts[to]) panic(err) @@ -463,7 +465,8 @@ func TestAdding(t *testing.T) { t.Fatalf("no network for %s", p1) } - s, err := h1.NewStream(protocol.TestingID, p2) + ctx := context.Background() + s, err := h1.NewStream(ctx, protocol.TestingID, p2) if err != nil { t.Fatal(err) } @@ -559,7 +562,8 @@ func TestLimitedStreams(t *testing.T) { link.SetOptions(opts) } - s, err := hosts[0].NewStream(protocol.TestingID, hosts[1].ID()) + ctx := context.Background() + s, err := hosts[0].NewStream(ctx, protocol.TestingID, hosts[1].ID()) if err != nil { t.Fatal(err) } diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index a6ddae3ccd..41c2ca95b0 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -192,16 +192,17 @@ func (s *Swarm) SetStreamHandler(handler inet.StreamHandler) { } // NewStreamWithPeer creates a new stream on any available connection to p -func (s *Swarm) NewStreamWithPeer(p peer.ID) (*Stream, error) { +func (s *Swarm) NewStreamWithPeer(ctx context.Context, p peer.ID) (*Stream, error) { // if we have no connections, try connecting. if len(s.ConnectionsToPeer(p)) == 0 { log.Debug("Swarm: NewStreamWithPeer no connections. Attempting to connect...") - if _, err := s.Dial(s.Context(), p); err != nil { + if _, err := s.Dial(ctx, p); err != nil { return nil, err } } log.Debug("Swarm: NewStreamWithPeer...") + // TODO: think about passing a context down to NewStreamWithGroup st, err := s.swarm.NewStreamWithGroup(p) return wrapStream(st), err } diff --git a/p2p/net/swarm/swarm_net.go b/p2p/net/swarm/swarm_net.go index d8724a49f9..e5a6e0216a 100644 --- a/p2p/net/swarm/swarm_net.go +++ b/p2p/net/swarm/swarm_net.go @@ -132,9 +132,9 @@ func (n *Network) Connectedness(p peer.ID) inet.Connectedness { // NewStream returns a new stream to given peer p. // If there is no connection to p, attempts to create one. -func (n *Network) NewStream(p peer.ID) (inet.Stream, error) { +func (n *Network) NewStream(ctx context.Context, p peer.ID) (inet.Stream, error) { log.Debugf("[%s] network opening stream to peer [%s]", n.local, p) - s, err := n.Swarm().NewStreamWithPeer(p) + s, err := n.Swarm().NewStreamWithPeer(ctx, p) if err != nil { return nil, err } diff --git a/p2p/net/swarm/swarm_test.go b/p2p/net/swarm/swarm_test.go index c8a1ce548e..b6aa692cb0 100644 --- a/p2p/net/swarm/swarm_test.go +++ b/p2p/net/swarm/swarm_test.go @@ -129,7 +129,7 @@ func SubtestSwarm(t *testing.T, SwarmNum int, MsgNum int) { defer wg.Done() // first, one stream per peer (nice) - stream, err := s1.NewStreamWithPeer(p) + stream, err := s1.NewStreamWithPeer(ctx, p) if err != nil { errChan <- err return diff --git a/p2p/protocol/ping/ping.go b/p2p/protocol/ping/ping.go index 9f238c6958..5878e76992 100644 --- a/p2p/protocol/ping/ping.go +++ b/p2p/protocol/ping/ping.go @@ -50,7 +50,7 @@ func (p *PingService) PingHandler(s inet.Stream) { } func (ps *PingService) Ping(ctx context.Context, p peer.ID) (<-chan time.Duration, error) { - s, err := ps.Host.NewStream(ID, p) + s, err := ps.Host.NewStream(ctx, ID, p) if err != nil { return nil, err } diff --git a/p2p/protocol/relay/relay.go b/p2p/protocol/relay/relay.go index 8785dc10ec..96f581966d 100644 --- a/p2p/protocol/relay/relay.go +++ b/p2p/protocol/relay/relay.go @@ -3,6 +3,7 @@ package relay import ( "fmt" "io" + "time" mh "gx/Qma7dqy7ZVH4tkNJdC9TRrA82Uz5fQfbbwuvmNVVc17r7a/go-multihash" @@ -10,6 +11,7 @@ import ( inet "github.com/ipfs/go-libp2p/p2p/net" peer "github.com/ipfs/go-libp2p/p2p/peer" protocol "github.com/ipfs/go-libp2p/p2p/protocol" + context "gx/QmacZi9WygGK7Me8mH53pypyscHzU386aUZXpr28GZgUct/context" logging "gx/QmfZZB1aVXWA4kaR5R4e9NifERT366TTCSagkfhmAbYLsu/go-log" ) @@ -83,10 +85,15 @@ func (rs *RelayService) consumeStream(s inet.Stream) error { // pipeStream relays over a stream to a remote peer. It's like `cat` func (rs *RelayService) pipeStream(src, dst peer.ID, s inet.Stream) error { - s2, err := rs.openStreamToPeer(dst) + // TODO: find a good way to pass contexts into here + nsctx, cancel := context.WithTimeout(context.TODO(), time.Second*30) + defer cancel() + + s2, err := rs.openStreamToPeer(nsctx, dst) if err != nil { return fmt.Errorf("failed to open stream to peer: %s -- %s", dst, err) } + cancel() // cancel here because this function might last a while if err := WriteHeader(s2, src, dst); err != nil { return err @@ -116,8 +123,8 @@ func (rs *RelayService) pipeStream(src, dst peer.ID, s inet.Stream) error { // openStreamToPeer opens a pipe to a remote endpoint // for now, can only open streams to directly connected peers. // maybe we can do some routing later on. -func (rs *RelayService) openStreamToPeer(p peer.ID) (inet.Stream, error) { - return rs.host.NewStream(ID, p) +func (rs *RelayService) openStreamToPeer(ctx context.Context, p peer.ID) (inet.Stream, error) { + return rs.host.NewStream(ctx, ID, p) } func ReadHeader(r io.Reader) (src, dst peer.ID, err error) { diff --git a/p2p/protocol/relay/relay_test.go b/p2p/protocol/relay/relay_test.go index 5fd8408540..a939a7bc01 100644 --- a/p2p/protocol/relay/relay_test.go +++ b/p2p/protocol/relay/relay_test.go @@ -50,7 +50,7 @@ func TestRelaySimple(t *testing.T) { // ok, now we can try to relay n1--->n2--->n3. log.Debug("open relay stream") - s, err := n1.NewStream(relay.ID, n2p) + s, err := n1.NewStream(ctx, relay.ID, n2p) if err != nil { t.Fatal(err) } @@ -145,7 +145,7 @@ func TestRelayAcrossFour(t *testing.T) { // ok, now we can try to relay n1--->n2--->n3--->n4--->n5 log.Debug("open relay stream") - s, err := n1.NewStream(relay.ID, n2p) + s, err := n1.NewStream(ctx, relay.ID, n2p) if err != nil { t.Fatal(err) } @@ -245,7 +245,7 @@ func TestRelayStress(t *testing.T) { // ok, now we can try to relay n1--->n2--->n3. log.Debug("open relay stream") - s, err := n1.NewStream(relay.ID, n2p) + s, err := n1.NewStream(ctx, relay.ID, n2p) if err != nil { t.Fatal(err) } diff --git a/p2p/test/backpressure/backpressure_test.go b/p2p/test/backpressure/backpressure_test.go index 3ce1abd210..52e330a0fe 100644 --- a/p2p/test/backpressure/backpressure_test.go +++ b/p2p/test/backpressure/backpressure_test.go @@ -84,7 +84,7 @@ a problem. }() for { - s, err = host.NewStream(protocol.TestingID, remote) + s, err = host.NewStream(context.Background(), protocol.TestingID, remote) if err != nil { return } @@ -286,7 +286,7 @@ func TestStBackpressureStreamWrite(t *testing.T) { } // open a stream, from 2->1, this is our reader - s, err := h2.NewStream(protocol.TestingID, h1.ID()) + s, err := h2.NewStream(context.Background(), protocol.TestingID, h1.ID()) if err != nil { t.Fatal(err) } diff --git a/p2p/test/reconnects/reconnect_test.go b/p2p/test/reconnects/reconnect_test.go index c0f560f7e4..d00fcfe416 100644 --- a/p2p/test/reconnects/reconnect_test.go +++ b/p2p/test/reconnects/reconnect_test.go @@ -178,7 +178,7 @@ func SubtestConnSendDisc(t *testing.T, hosts []host.Host) { for i := 0; i < numStreams; i++ { h1 := hosts[i%len(hosts)] h2 := hosts[(i+1)%len(hosts)] - s, err := h1.NewStream(protocol.TestingID, h2.ID()) + s, err := h1.NewStream(context.Background(), protocol.TestingID, h2.ID()) if err != nil { t.Error(err) }