From 708eb30f29a13fde3b3611ce4ec872ad149d266d Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Mon, 18 Nov 2024 11:26:21 -0800 Subject: [PATCH] fix(identify): push should not dial a new connection (#3035) * fix: identify: push should not dial a new connection * send IDPush on given conn --- p2p/protocol/identify/id.go | 34 ++++++++++++++++++++++++---------- 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/p2p/protocol/identify/id.go b/p2p/protocol/identify/id.go index 1733a4166c..b6d5240ba6 100644 --- a/p2p/protocol/identify/id.go +++ b/p2p/protocol/identify/id.go @@ -347,7 +347,8 @@ func (ids *idService) sendPushes(ctx context.Context) { defer func() { <-sem }() ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() - str, err := ids.Host.NewStream(ctx, c.RemotePeer(), IDPush) + + str, err := newStreamAndNegotiate(ctx, c, IDPush) if err != nil { // connection might have been closed recently return } @@ -437,25 +438,38 @@ func (ids *idService) IdentifyWait(c network.Conn) <-chan struct{} { return e.IdentifyWaitChan } -func (ids *idService) identifyConn(c network.Conn) error { - ctx, cancel := context.WithTimeout(context.Background(), Timeout) - defer cancel() +// newStreamAndNegotiate opens a new stream on the given connection and negotiates the given protocol. +func newStreamAndNegotiate(ctx context.Context, c network.Conn, proto protocol.ID) (network.Stream, error) { s, err := c.NewStream(network.WithAllowLimitedConn(ctx, "identify")) if err != nil { log.Debugw("error opening identify stream", "peer", c.RemotePeer(), "error", err) - return err + return nil, err + } + err = s.SetDeadline(time.Now().Add(Timeout)) + if err != nil { + return nil, err } - s.SetDeadline(time.Now().Add(Timeout)) - if err := s.SetProtocol(ID); err != nil { + if err := s.SetProtocol(proto); err != nil { log.Warnf("error setting identify protocol for stream: %s", err) - s.Reset() + _ = s.Reset() } // ok give the response to our handler. - if err := msmux.SelectProtoOrFail(ID, s); err != nil { + if err := msmux.SelectProtoOrFail(proto, s); err != nil { log.Infow("failed negotiate identify protocol with peer", "peer", c.RemotePeer(), "error", err) - s.Reset() + _ = s.Reset() + return nil, err + } + return s, nil +} + +func (ids *idService) identifyConn(c network.Conn) error { + ctx, cancel := context.WithTimeout(context.Background(), Timeout) + defer cancel() + s, err := newStreamAndNegotiate(network.WithAllowLimitedConn(ctx, "identify"), c, ID) + if err != nil { + log.Debugw("error opening identify stream", "peer", c.RemotePeer(), "error", err) return err }