Skip to content

Commit

Permalink
Revert "fix(store): properly update store head (#207)" (#237)
Browse files Browse the repository at this point in the history
This reverts commit 18f0eb1. (#207)
  • Loading branch information
cristaloleg authored Jan 8, 2025
1 parent f512cad commit 996923f
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 359 deletions.
36 changes: 14 additions & 22 deletions store/heightsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package store
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"

Expand All @@ -29,35 +28,27 @@ func newHeightSub[H header.Header[H]]() *heightSub[H] {
}
}

func (hs *heightSub[H]) isInited() bool {
return hs.height.Load() != 0
// Height reports current height.
func (hs *heightSub[H]) Height() uint64 {
return hs.height.Load()
}

// setHeight sets the new head height for heightSub.
// Only higher than current height can be set.
func (hs *heightSub[H]) setHeight(height uint64) {
for {
curr := hs.height.Load()
if curr >= height {
return
}
if hs.height.CompareAndSwap(curr, height) {
return
}
}
// SetHeight sets the new head height for heightSub.
func (hs *heightSub[H]) SetHeight(height uint64) {
hs.height.Store(height)
}

// Sub subscribes for a header of a given height.
// It can return errElapsedHeight, which means a requested header was already provided
// and caller should get it elsewhere.
func (hs *heightSub[H]) Sub(ctx context.Context, height uint64) (H, error) {
var zero H
if hs.height.Load() >= height {
if hs.Height() >= height {
return zero, errElapsedHeight
}

hs.heightReqsLk.Lock()
if hs.height.Load() >= height {
if hs.Height() >= height {
// This is a rare case we have to account for.
// The lock above can park a goroutine long enough for hs.height to change for a requested height,
// leaving the request never fulfilled and the goroutine deadlocked.
Expand Down Expand Up @@ -90,20 +81,21 @@ func (hs *heightSub[H]) Sub(ctx context.Context, height uint64) (H, error) {

// Pub processes all the outstanding subscriptions matching the given headers.
// Pub is only safe when called from one goroutine.
// For Pub to work correctly, heightSub has to be initialized with setHeight
// For Pub to work correctly, heightSub has to be initialized with SetHeight
// so that given headers are contiguous to the height on heightSub.
func (hs *heightSub[H]) Pub(headers ...H) {
ln := len(headers)
if ln == 0 {
return
}

height := hs.Height()
from, to := headers[0].Height(), headers[ln-1].Height()
if from > to {
panic(fmt.Sprintf("from must be lower than to, have: %d and %d", from, to))
if height+1 != from && height != 0 { // height != 0 is needed to enable init from any height and not only 1
log.Fatalf("PLEASE FILE A BUG REPORT: headers given to the heightSub are in the wrong order: expected %d, got %d", height+1, from)
return
}

hs.setHeight(to)
hs.SetHeight(to)

hs.heightReqsLk.Lock()
defer hs.heightReqsLk.Unlock()
Expand Down
64 changes: 1 addition & 63 deletions store/heightsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestHeightSub(t *testing.T) {
{
h := headertest.RandDummyHeader(t)
h.HeightI = 100
hs.setHeight(99)
hs.SetHeight(99)
hs.Pub(h)

h, err := hs.Sub(ctx, 10)
Expand All @@ -47,68 +47,6 @@ func TestHeightSub(t *testing.T) {
}
}

// Test heightSub can accept non-adj headers without a problem.
func TestHeightSubNonAdjacement(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

hs := newHeightSub[*headertest.DummyHeader]()

{
h := headertest.RandDummyHeader(t)
h.HeightI = 100
hs.setHeight(99)
hs.Pub(h)
}

{
go func() {
// fixes flakiness on CI
time.Sleep(time.Millisecond)

h1 := headertest.RandDummyHeader(t)
h1.HeightI = 200
h2 := headertest.RandDummyHeader(t)
h2.HeightI = 300
hs.Pub(h1, h2)
}()

h, err := hs.Sub(ctx, 200)
assert.NoError(t, err)
assert.NotNil(t, h)
}
}

func TestHeightSub_monotonicHeight(t *testing.T) {
hs := newHeightSub[*headertest.DummyHeader]()

{
h := headertest.RandDummyHeader(t)
h.HeightI = 100
hs.setHeight(99)
hs.Pub(h)
}

{
h1 := headertest.RandDummyHeader(t)
h1.HeightI = 200
h2 := headertest.RandDummyHeader(t)
h2.HeightI = 300
hs.Pub(h1, h2)
}

{

h1 := headertest.RandDummyHeader(t)
h1.HeightI = 120
h2 := headertest.RandDummyHeader(t)
h2.HeightI = 130
hs.Pub(h1, h2)
}

assert.Equal(t, hs.height.Load(), uint64(300))
}

func TestHeightSubCancellation(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
Expand Down
116 changes: 36 additions & 80 deletions store/store.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package store

import (
"cmp"
"context"
"errors"
"fmt"
"slices"
"sync/atomic"
"time"

Expand Down Expand Up @@ -53,7 +51,6 @@ type Store[H header.Header[H]] struct {
writesDn chan struct{}
// writeHead maintains the current write head
writeHead atomic.Pointer[H]

// pending keeps headers pending to be written in one batch
pending *batch[H]

Expand Down Expand Up @@ -115,7 +112,7 @@ func newStore[H header.Header[H]](ds datastore.Batching, opts ...Option) (*Store
}

func (s *Store[H]) Init(ctx context.Context, initial H) error {
if s.heightSub.isInited() {
if s.heightSub.Height() != 0 {
return errors.New("store already initialized")
}
// trust the given header as the initial head
Expand Down Expand Up @@ -167,37 +164,27 @@ func (s *Store[H]) Stop(ctx context.Context) error {
}

func (s *Store[H]) Height() uint64 {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

head, err := s.Head(ctx)
if err != nil {
if errors.Is(err, context.Canceled) ||
errors.Is(err, context.DeadlineExceeded) ||
errors.Is(err, datastore.ErrNotFound) {
return 0
}
panic(err)
}
return head.Height()
return s.heightSub.Height()
}

// Head returns the highest contiguous header written to the store.
func (s *Store[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, error) {
headPtr := s.writeHead.Load()
if headPtr != nil {
return *headPtr, nil
head, err := s.GetByHeight(ctx, s.heightSub.Height())
if err == nil {
return head, nil
}

head, err := s.readHead(ctx)
if err != nil {
var zero H
var zero H
head, err = s.readHead(ctx)
switch {
default:
return zero, err
case errors.Is(err, datastore.ErrNotFound), errors.Is(err, header.ErrNotFound):
return zero, header.ErrNoHead
case err == nil:
s.heightSub.SetHeight(head.Height())
log.Infow("loaded head", "height", head.Height(), "hash", head.Hash())
return head, nil
}

s.writeHead.CompareAndSwap(nil, &head)

return head, nil
}

func (s *Store[H]) Get(ctx context.Context, hash header.Hash) (H, error) {
Expand Down Expand Up @@ -244,16 +231,12 @@ func (s *Store[H]) GetByHeight(ctx context.Context, height uint64) (H, error) {
return h, nil
}

return s.getByHeight(ctx, height)
}

func (s *Store[H]) getByHeight(ctx context.Context, height uint64) (H, error) {
var zero H
hash, err := s.heightIndex.HashByHeight(ctx, height)
if err != nil {
if errors.Is(err, datastore.ErrNotFound) {
return zero, header.ErrNotFound
}

return zero, err
}

Expand Down Expand Up @@ -315,27 +298,29 @@ func (s *Store[H]) HasAt(_ context.Context, height uint64) bool {
return height != uint64(0) && s.Height() >= height
}

// Append the given headers to the store. Real write to the disk happens
// asynchronously and might fail without reporting error (just logging).
func (s *Store[H]) Append(ctx context.Context, headers ...H) error {
lh := len(headers)
if lh == 0 {
return nil
}

var err error
// take current write head to verify headers against
head, err := s.Head(ctx)
if err != nil {
return err
var head H
headPtr := s.writeHead.Load()
if headPtr == nil {
head, err = s.Head(ctx)
if err != nil {
return err
}
} else {
head = *headPtr
}

slices.SortFunc(headers, func(a, b H) int {
return cmp.Compare(a.Height(), b.Height())
})

// collect valid headers
verified := make([]H, 0, lh)
for i, h := range headers {

err = head.Verify(h)
if err != nil {
var verErr *header.VerifyError
Expand All @@ -359,19 +344,27 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error {
head = h
}

onWrite := func() {
newHead := verified[len(verified)-1]
s.writeHead.Store(&newHead)
log.Infow("new head", "height", newHead.Height(), "hash", newHead.Hash())
s.metrics.newHead(newHead.Height())
}

// queue headers to be written on disk
select {
case s.writes <- verified:
// we return an error here after writing,
// as there might be an invalid header in between of a given range
onWrite()
return err
default:
s.metrics.writesQueueBlocked(ctx)
}

// if the writes queue is full, we block until it is not
select {
case s.writes <- verified:
onWrite()
return err
case <-s.writesDn:
return errStoppedStore
Expand Down Expand Up @@ -418,8 +411,6 @@ func (s *Store[H]) flushLoop() {
time.Sleep(sleep)
}

s.tryAdvanceHead(ctx, toFlush...)

s.metrics.flush(ctx, time.Since(startTime), s.pending.Len(), false)
// reset pending
s.pending.Reset()
Expand Down Expand Up @@ -508,41 +499,6 @@ func (s *Store[H]) get(ctx context.Context, hash header.Hash) ([]byte, error) {
return data, nil
}

// try advance heighest writeHead based on passed or already written headers.
func (s *Store[H]) tryAdvanceHead(ctx context.Context, headers ...H) {
writeHead := s.writeHead.Load()
if writeHead == nil || len(headers) == 0 {
return
}

currHeight := (*writeHead).Height()

// advance based on passed headers.
for i := 0; i < len(headers); i++ {
if headers[i].Height() != currHeight+1 {
break
}
newHead := headers[i]
s.writeHead.Store(&newHead)
currHeight++
}

// TODO(cristaloleg): benchmark this timeout or make it dynamic.
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

// advance based on already written headers.
for {
h, err := s.getByHeight(ctx, currHeight+1)
if err != nil {
break
}
newHead := h
s.writeHead.Store(&newHead)
currHeight++
}
}

// indexTo saves mapping between header Height and Hash to the given batch.
func indexTo[H header.Header[H]](ctx context.Context, batch datastore.Batch, headers ...H) error {
for _, h := range headers {
Expand Down
Loading

0 comments on commit 996923f

Please sign in to comment.