Skip to content

Commit

Permalink
hehe
Browse files Browse the repository at this point in the history
  • Loading branch information
cristaloleg committed Jan 27, 2025
1 parent e9fbde6 commit 07babe7
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 36 deletions.
36 changes: 17 additions & 19 deletions store/heightsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@ import (
"errors"
"sync"
"sync/atomic"

"github.com/celestiaorg/go-header"
)

// errElapsedHeight is thrown when a requested height was already provided to heightSub.
var errElapsedHeight = errors.New("elapsed height")

// heightSub provides a minimalistic mechanism to wait till header for a height becomes available.
type heightSub[H header.Header[H]] struct {
type heightSub struct {
// height refers to the latest locally available header height
// that has been fully verified and inserted into the subjective chain
height atomic.Uint64
Expand All @@ -27,35 +25,35 @@ type sub struct {
}

// newHeightSub instantiates new heightSub.
func newHeightSub[H header.Header[H]]() *heightSub[H] {
return &heightSub[H]{
func newHeightSub() *heightSub {
return &heightSub{
heightSubs: make(map[uint64]*sub),
}
}

// Init the heightSub with a given height.
// Notifies all awaiting [WaitHeight] calls lower than height.
func (hs *heightSub[H]) Init(height uint64) {
// Notifies all awaiting [Wait] calls lower than height.
func (hs *heightSub) Init(height uint64) {
hs.height.Store(height)

hs.heightSubsLk.Lock()
defer hs.heightSubsLk.Unlock()

for h := range hs.heightSubs {
if h < height {
hs.notifyHeight(h, true)
hs.notify(h, true)
}
}
}

// Height reports current height.
func (hs *heightSub[H]) Height() uint64 {
func (hs *heightSub) Height() uint64 {
return hs.height.Load()
}

// SetHeight sets the new head height for heightSub.
// Notifies all awaiting [WaitHeight] calls in range from [heightSub.Height] to height.
func (hs *heightSub[H]) SetHeight(height uint64) {
// Notifies all awaiting [Wait] calls in range from [heightSub.Height] to height.
func (hs *heightSub) SetHeight(height uint64) {
for {
curr := hs.height.Load()
if curr >= height {
Expand All @@ -69,16 +67,16 @@ func (hs *heightSub[H]) SetHeight(height uint64) {
defer hs.heightSubsLk.Unlock() //nolint:gocritic we have a return below

for ; curr <= height; curr++ {
hs.notifyHeight(curr, true)
hs.notify(curr, true)
}
return
}
}

// WaitHeight for a given height to be published.
// Wait for a given height to be published.
// It can return errElapsedHeight, which means a requested height was already seen
// and caller should get it elsewhere.
func (hs *heightSub[H]) WaitHeight(ctx context.Context, height uint64) error {
func (hs *heightSub) Wait(ctx context.Context, height uint64) error {
if hs.Height() >= height {
return errElapsedHeight
}
Expand Down Expand Up @@ -108,22 +106,22 @@ func (hs *heightSub[H]) WaitHeight(ctx context.Context, height uint64) error {
case <-ctx.Done():
// no need to keep the request, if the op has canceled
hs.heightSubsLk.Lock()
hs.notifyHeight(height, false)
hs.notify(height, false)
hs.heightSubsLk.Unlock()
return ctx.Err()
}
}

// NotifyHeight and release the waiters in [WaitHeight].
// Notify and release the waiters in [Wait].
// Note: do not advance heightSub's height.
func (hs *heightSub[H]) NotifyHeight(height uint64) {
func (hs *heightSub) Notify(height uint64) {
hs.heightSubsLk.Lock()
defer hs.heightSubsLk.Unlock()

hs.notifyHeight(height, true)
hs.notify(height, true)
}

func (hs *heightSub[H]) notifyHeight(height uint64, all bool) {
func (hs *heightSub) notify(height uint64, all bool) {
sac, ok := hs.heightSubs[height]
if !ok {
return
Expand Down
26 changes: 13 additions & 13 deletions store/heightsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ func TestHeightSub(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

hs := newHeightSub[*headertest.DummyHeader]()
hs := newHeightSub()

// assert subscription returns nil for past heights
{
hs.Init(99)

err := hs.WaitHeight(ctx, 10)
err := hs.Wait(ctx, 10)
assert.ErrorIs(t, err, errElapsedHeight)
}

Expand All @@ -33,7 +33,7 @@ func TestHeightSub(t *testing.T) {
hs.SetHeight(102)
}()

err := hs.WaitHeight(ctx, 101)
err := hs.Wait(ctx, 101)
assert.NoError(t, err)
}

Expand All @@ -42,7 +42,7 @@ func TestHeightSub(t *testing.T) {
ch := make(chan error, 10)
for range cap(ch) {
go func() {
err := hs.WaitHeight(ctx, 103)
err := hs.Wait(ctx, 103)
ch <- err
}()
}
Expand All @@ -61,7 +61,7 @@ func TestHeightSub_withWaitCancelled(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

hs := newHeightSub[*headertest.DummyHeader]()
hs := newHeightSub()
hs.Init(10)

const waiters = 5
Expand All @@ -76,15 +76,15 @@ func TestHeightSub_withWaitCancelled(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, time.Duration(i+1)*time.Millisecond)
defer cancel()

err := hs.WaitHeight(ctx, 100)
err := hs.Wait(ctx, 100)
cancelChs[i] <- err
}()

go func() {
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()

err := hs.WaitHeight(ctx, 100)
err := hs.Wait(ctx, 100)
blockedChs[i] <- err
}()
}
Expand All @@ -108,7 +108,7 @@ func TestHeightSubNonAdjacement(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

hs := newHeightSub[*headertest.DummyHeader]()
hs := newHeightSub()
hs.Init(99)

go func() {
Expand All @@ -118,13 +118,13 @@ func TestHeightSubNonAdjacement(t *testing.T) {
hs.SetHeight(300)
}()

err := hs.WaitHeight(ctx, 200)
err := hs.Wait(ctx, 200)
assert.NoError(t, err)
}

// Test heightSub's height cannot go down but only up.
func TestHeightSub_monotonicHeight(t *testing.T) {
hs := newHeightSub[*headertest.DummyHeader]()
hs := newHeightSub()

hs.Init(99)
assert.Equal(t, int64(hs.height.Load()), int64(99))
Expand All @@ -142,12 +142,12 @@ func TestHeightSubCancellation(t *testing.T) {

h := headertest.RandDummyHeader(t)
h.HeightI %= 1000 // make it a bit lower
hs := newHeightSub[*headertest.DummyHeader]()
hs := newHeightSub()

sub := make(chan struct{})
go func() {
// subscribe first time
hs.WaitHeight(ctx, h.Height())
hs.Wait(ctx, h.Height())
sub <- struct{}{}
}()

Expand All @@ -157,7 +157,7 @@ func TestHeightSubCancellation(t *testing.T) {
// subscribe again but with failed canceled context
canceledCtx, cancel := context.WithCancel(ctx)
cancel()
err := hs.WaitHeight(canceledCtx, h.Height())
err := hs.Wait(canceledCtx, h.Height())
assert.ErrorIs(t, err, context.Canceled)

// update height
Expand Down
8 changes: 4 additions & 4 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type Store[H header.Header[H]] struct {
heightIndex *heightIndexer[H]
// manages current store read head height (1) and
// allows callers to wait until header for a height is stored (2)
heightSub *heightSub[H]
heightSub *heightSub

// writing to datastore
//
Expand Down Expand Up @@ -103,7 +103,7 @@ func newStore[H header.Header[H]](ds datastore.Batching, opts ...Option) (*Store
cache: cache,
metrics: metrics,
heightIndex: index,
heightSub: newHeightSub[H](),
heightSub: newHeightSub(),
writes: make(chan []H, 16),
writesDn: make(chan struct{}),
pending: newBatch[H](params.WriteBatchSize),
Expand Down Expand Up @@ -222,7 +222,7 @@ func (s *Store[H]) GetByHeight(ctx context.Context, height uint64) (H, error) {
// if the requested 'height' was not yet published
// we subscribe to it
if head := s.contiguousHead.Load(); head == nil || height > (*head).Height() {
err := s.heightSub.WaitHeight(ctx, height)
err := s.heightSub.Wait(ctx, height)
if err != nil && !errors.Is(err, errElapsedHeight) {
return zero, err
}
Expand Down Expand Up @@ -511,7 +511,7 @@ func (s *Store[H]) get(ctx context.Context, hash header.Hash) ([]byte, error) {
func (s *Store[H]) notifyAndAdvance(ctx context.Context, headers ...H) {
// always inform heightSub about new headers seen
for _, h := range headers {
s.heightSub.NotifyHeight(h.Height())
s.heightSub.Notify(h.Height())
}

currHead := s.contiguousHead.Load()
Expand Down

0 comments on commit 07babe7

Please sign in to comment.