Skip to content

Commit

Permalink
Fixed buffered race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
echistyakov committed Dec 17, 2024
1 parent 473989b commit de163b3
Showing 1 changed file with 42 additions and 23 deletions.
65 changes: 42 additions & 23 deletions core/framing/buffered.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,83 +12,102 @@ import (

// bufferedFrame is basic frame implementation.
type bufferedFrame struct {
inner *common.ByteBuff
refs int32
innerPtr atomic.Pointer[common.ByteBuff]

Check failure on line 15 in core/framing/buffered.go

View workflow job for this annotation

GitHub Actions / runner / golangci-lint

[golangci] reported by reviewdog 🐶 type instantiation requires go1.18 or later (typecheck) Raw Output: core/framing/buffered.go:15:25: type instantiation requires go1.18 or later (typecheck) innerPtr atomic.Pointer[common.ByteBuff] ^

Check failure on line 15 in core/framing/buffered.go

View workflow job for this annotation

GitHub Actions / build

type instantiation requires go1.18 or later (-lang was set to go1.15; check go.mod)
refs atomic.Int32

Check failure on line 16 in core/framing/buffered.go

View workflow job for this annotation

GitHub Actions / runner / golangci-lint

[golangci] reported by reviewdog 🐶 undefined: atomic.Int32 (typecheck) Raw Output: core/framing/buffered.go:16:18: undefined: atomic.Int32 (typecheck) refs atomic.Int32 ^
}

func newBufferedFrame(inner *common.ByteBuff) *bufferedFrame {
return &bufferedFrame{
inner: inner,
refs: 1,
}
frame := &bufferedFrame{}
frame.innerPtr.Store(inner)
frame.refs.Store(1)
return frame
}

func (f *bufferedFrame) IncRef() int32 {
return atomic.AddInt32(&f.refs, 1)
return f.refs.Add(1)
}

func (f *bufferedFrame) RefCnt() int32 {
return atomic.LoadInt32(&f.refs)
return f.refs.Load()
}

func (f *bufferedFrame) Header() core.FrameHeader {
if f.inner == nil {
inner := f.innerPtr.Load()
if inner == nil {
panic("frame has been released!")
}
b := f.inner.Bytes()
b := inner.Bytes()
_ = b[core.FrameHeaderLen-1]
var h core.FrameHeader
copy(h[:], b)
return h
}

func (f *bufferedFrame) HasFlag(flag core.FrameFlag) bool {
if f.inner == nil {
inner := f.innerPtr.Load()
if inner == nil {
panic("frame has been released!")
}
n := binary.BigEndian.Uint16(f.inner.Bytes()[4:6])
n := binary.BigEndian.Uint16(inner.Bytes()[4:6])
return core.FrameFlag(n&0x03FF)&flag == flag
}

func (f *bufferedFrame) StreamID() uint32 {
if f.inner == nil {
inner := f.innerPtr.Load()
if inner == nil {
panic("frame has been released!")
}
return binary.BigEndian.Uint32(f.inner.Bytes()[:4])
return binary.BigEndian.Uint32(inner.Bytes()[:4])
}

// Release releases resource.
func (f *bufferedFrame) Release() {
if f != nil && f.inner != nil && atomic.AddInt32(&f.refs, -1) == 0 {
common.ReturnByteBuff(f.inner)
f.inner = nil
if f == nil {
return
}
refs := f.refs.Add(-1)
if refs > 0 {
return
}
inner := f.innerPtr.Load()
if inner != nil {
swapped := f.innerPtr.CompareAndSwap(inner, nil)
if swapped {
common.ReturnByteBuff(inner)
}
}
}

// Body returns frame body.
func (f *bufferedFrame) Body() []byte {
if f.inner == nil {
inner := f.innerPtr.Load()
if inner == nil {
return nil
}
b := f.inner.Bytes()
b := inner.Bytes()
_ = b[core.FrameHeaderLen-1]
return b[core.FrameHeaderLen:]
}

// Len returns length of frame.
func (f *bufferedFrame) Len() int {
if f.inner == nil {
inner := f.innerPtr.Load()
if inner == nil {
return 0
}
return f.inner.Len()
return inner.Len()
}

// WriteTo write frame to writer.
func (f *bufferedFrame) WriteTo(w io.Writer) (n int64, err error) {
if f == nil || f.inner == nil {
if f == nil {
return
}
inner := f.innerPtr.Load()
if inner == nil {
return
}
n, err = f.inner.WriteTo(w)
n, err = inner.WriteTo(w)
return
}

Expand Down

0 comments on commit de163b3

Please sign in to comment.