Skip to content

Commit

Permalink
fix(store): heartbeat with wrong block state
Browse files Browse the repository at this point in the history
Signed-off-by: James Yin <[email protected]>
  • Loading branch information
ifplusor committed Feb 9, 2023
1 parent 2b99026 commit 2a78cb5
Show file tree
Hide file tree
Showing 8 changed files with 33 additions and 11 deletions.
11 changes: 10 additions & 1 deletion internal/controller/eventbus/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,7 @@ func (ctrl *controller) SegmentHeartbeat(srv ctrlpb.SegmentController_SegmentHea
}
return nil
}

func (ctrl *controller) processHeartbeat(ctx context.Context, req *ctrlpb.SegmentHeartbeatRequest) error {
if !ctrl.member.IsLeader() {
return errors.ErrNotLeader
Expand Down Expand Up @@ -488,7 +489,15 @@ func (ctrl *controller) GetAppendableSegment(ctx context.Context,
}

func (ctrl *controller) ReportSegmentBlockIsFull(ctx context.Context,
req *ctrlpb.SegmentHeartbeatRequest) (*emptypb.Empty, error) {
req *ctrlpb.SegmentHeartbeatRequest,
) (*emptypb.Empty, error) {
for _, info := range req.GetHealthInfo() {
log.Info(ctx, "Received segment block is full report.", map[string]interface{}{
"block_id": vanus.NewIDFromUint64(info.GetId()),
"event_num": info.GetEventNumber(),
"event_size": info.GetSize(),
})
}
if err := ctrl.processHeartbeat(ctx, req); err != nil {
return nil, err
}
Expand Down
10 changes: 9 additions & 1 deletion internal/controller/eventbus/eventlog/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ import (
"encoding/json"
"time"

"github.com/linkall-labs/vanus/observability/log"
metapb "github.com/linkall-labs/vanus/proto/pkg/meta"

"github.com/linkall-labs/vanus/internal/controller/eventbus/metadata"
"github.com/linkall-labs/vanus/internal/primitive/vanus"
metapb "github.com/linkall-labs/vanus/proto/pkg/meta"
)

type SegmentState string
Expand Down Expand Up @@ -84,6 +86,12 @@ func (seg *Segment) isNeedUpdate(newSeg Segment) bool {
}
// TODO(wenfeng): follow state shift
if newSeg.State != "" && seg.State != newSeg.State {
log.Info(context.Background(), "Update segment by state.", map[string]interface{}{
log.KeySegmentID: newSeg.ID,
"event_num": newSeg.Number,
"event_size": newSeg.Size,
"state": newSeg.State,
})
seg.State = newSeg.State
needed = true
}
Expand Down
2 changes: 1 addition & 1 deletion internal/store/raft/block/appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (a *appender) applyEntries(ctx context.Context, committedEntries []raftpb.E
// })
a.onAppend(ctx, index)

if a.appendLis != nil {
if frag != nil && a.appendLis != nil {
a.appendLis(a.ID())
}
})
Expand Down
6 changes: 4 additions & 2 deletions internal/store/segment/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,8 +593,10 @@ func (s *server) onEntryAppended(block vanus.ID) {
func (s *server) onBlockArchived(stat block.Statistics) {
id := stat.ID

log.Debug(context.Background(), "Block is full.", map[string]interface{}{
"block_id": id,
log.Info(context.Background(), "Block is full.", map[string]interface{}{
"block_id": id,
"event_num": stat.EntryNum,
"event_size": stat.EntrySize,
})

// FIXME(james.yin): leader info.
Expand Down
9 changes: 6 additions & 3 deletions internal/store/vsb/block_append.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,16 +177,17 @@ func (b *vsBlock) CommitAppend(ctx context.Context, frag block.Fragment, cb bloc
return
}

atomic.StoreUint32(&b.actx.archived, 1)

b.wg.Add(1)
b.s.Append(bytes.NewReader(frag.Payload()), func(n int, err error) {
if len(indexes) != 0 {
if len(indexes) != 0 { // always false currently.
b.mu.Lock()
b.indexes = append(b.indexes, indexes...)
b.mu.Unlock()
}

// NOTE: must update archived flag after append all indexes.
atomic.StoreUint32(&b.actx.archived, 1)

cb()

m, i := makeSnapshot(b.actx, b.indexes)
Expand All @@ -202,6 +203,8 @@ func (b *vsBlock) CommitAppend(ctx context.Context, frag block.Fragment, cb bloc
b.lis.OnArchived(b.stat(m, i))
}
})
// No more data, so call Sync() to avoid waiting.
b.s.Sync()
}

func (b *vsBlock) buildIndexes(
Expand Down
2 changes: 1 addition & 1 deletion internal/store/vsb/codec/packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"hash/crc32"
"io"

// first-party project.
// first-party libraries.
"github.com/linkall-labs/vanus/observability/tracing"
"github.com/linkall-labs/vanus/pkg/errors"

Expand Down
2 changes: 1 addition & 1 deletion internal/store/wal/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
// third-party project.
"github.com/ncw/directio"

// first-party project.
// first-party libraries.
"github.com/linkall-labs/vanus/observability/log"

// this project.
Expand Down
2 changes: 1 addition & 1 deletion internal/store/wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"errors"
"sync"

// first-party project.
// first-party libraries.
"github.com/linkall-labs/vanus/observability/log"

// this project.
Expand Down

0 comments on commit 2a78cb5

Please sign in to comment.