Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[skip changelog] fix: Drop stream references on Close/Reset #760

Merged
merged 2 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ The following emojis are used to highlight certain changes:
- 🛠 `blockstore` and `blockservice`'s `WriteThrough()` option now takes an "enabled" parameter: `WriteThrough(enabled bool)`.
- Replaced unmaintained mock time implementation uses in tests: [from](github.com/benbjohnson/clock) => [to](github.com/filecoin-project/go-clock)
- updated to go-libp2p to [v0.38.0](https://github.com/libp2p/go-libp2p/releases/tag/v0.38.0)
- `bitswap/client`: if a libp2p connection has a context, use `context.AfterFunc` to cleanup the connection.


### Removed
Expand Down
72 changes: 57 additions & 15 deletions bitswap/network/ipfs_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,45 @@
receivers []Receiver
}

// interfaceWrapper is concrete type that wraps an interface. Necessary because
// atomic.Value needs the same type and can not Store(nil). This indirection
// allows us to store nil.
type interfaceWrapper[T any] struct {
t T
}
type atomicInterface[T any] struct {
iface atomic.Value
}

func (a *atomicInterface[T]) Load() T {
var v T
x := a.iface.Load()
if x != nil {
return x.(interfaceWrapper[T]).t
}
return v
}

func (a *atomicInterface[T]) Store(v T) {
a.iface.Store(interfaceWrapper[T]{v})
}

type streamMessageSender struct {
to peer.ID
stream network.Stream
connected bool
bsnet *impl
opts *MessageSenderOpts
to peer.ID
stream atomicInterface[network.Stream]
bsnet *impl
opts *MessageSenderOpts
}

type HasContext interface {
Context() context.Context
}

// Open a stream to the remote peer
func (s *streamMessageSender) Connect(ctx context.Context) (network.Stream, error) {
if s.connected {
return s.stream, nil
stream := s.stream.Load()
if stream != nil {
return stream, nil
}

tctx, cancel := context.WithTimeout(ctx, s.opts.SendTimeout)
Expand All @@ -107,30 +134,45 @@
if err != nil {
return nil, err
}
if withCtx, ok := stream.Conn().(HasContext); ok {
context.AfterFunc(withCtx.Context(), func() {
s.stream.Store(nil)
})

Check warning on line 140 in bitswap/network/ipfs_impl.go

View check run for this annotation

Codecov / codecov/patch

bitswap/network/ipfs_impl.go#L138-L140

Added lines #L138 - L140 were not covered by tests
}

s.stream = stream
s.connected = true
return s.stream, nil
s.stream.Store(stream)
return stream, nil
}

// Reset the stream
func (s *streamMessageSender) Reset() error {
if s.stream != nil {
err := s.stream.Reset()
s.connected = false
stream := s.stream.Load()
if stream != nil {
err := stream.Reset()
s.stream.Store(nil)
return err
}
return nil
}

// Close the stream
func (s *streamMessageSender) Close() error {
return s.stream.Close()
gammazero marked this conversation as resolved.
Show resolved Hide resolved
stream := s.stream.Load()
if stream != nil {
err := stream.Close()
s.stream.Store(nil)
return err
}
return nil
}

// Indicates whether the peer supports HAVE / DONT_HAVE messages
func (s *streamMessageSender) SupportsHave() bool {
return s.bsnet.SupportsHave(s.stream.Protocol())
stream := s.stream.Load()
if stream == nil {
return false
}

Check warning on line 174 in bitswap/network/ipfs_impl.go

View check run for this annotation

Codecov / codecov/patch

bitswap/network/ipfs_impl.go#L173-L174

Added lines #L173 - L174 were not covered by tests
return s.bsnet.SupportsHave(stream.Protocol())
}

// Send a message to the peer, attempting multiple times
Expand Down
Loading