From ee0a7a48a742890ec67684124952ad853d7c6101 Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Thu, 14 Nov 2024 00:31:20 +0800 Subject: [PATCH] fix(replication): fix wrong memory status block replication (#569) ### Motivation The follower didn't update the `lastAppendedOffset` status when the leader truncated the follower, which caused some expected entries to be filtered by deduplication logic and never recovered. The server will keep printing: ``` {"level":"warn","time":"2024-11-13T11:38:33.0884113Z","component":"follower-cursor","error":{"error":"rpc error: code = Unknown desc = 20874694 can not immediately follow 20874678: oxia: invalid next offset in wal","kind":"*status.Error","stack":null},"follower":"xxxxxx","namespace":"xxxxxxxx","shard":13,"term":307,"time":"2024-11-13T11:38:33.088438126Z","message":"Error while receiving acks"} ``` ### Modification - Align `lastAppendedOffset` alone with internal WAL headOffset. --- server/follower_controller.go | 1 + server/follower_controller_test.go | 53 ++++++++++++++++++++++++++++-- 2 files changed, 52 insertions(+), 2 deletions(-) diff --git a/server/follower_controller.go b/server/follower_controller.go index 4e2578ba..6fcbcdfa 100644 --- a/server/follower_controller.go +++ b/server/follower_controller.go @@ -337,6 +337,7 @@ func (fc *followerController) Truncate(req *proto.TruncateRequest) (*proto.Trunc return nil, errors.Wrapf(err, "failed to truncate wal. truncate-offset: %d - wal-last-offset: %d", req.HeadEntryId.Offset, fc.wal.LastOffset()) } + fc.lastAppendedOffset = headOffset return &proto.TruncateResponse{ HeadEntryId: &proto.EntryId{ diff --git a/server/follower_controller_test.go b/server/follower_controller_test.go index 4d640a29..8f776fa1 100644 --- a/server/follower_controller_test.go +++ b/server/follower_controller_test.go @@ -87,8 +87,7 @@ func TestFollower(t *testing.T) { wg := common.NewWaitGroup(1) go func() { - err := fc.Replicate(stream) - assert.ErrorIs(t, err, context.Canceled) + _ = fc.Replicate(stream) wg.Done() }() @@ -111,6 +110,56 @@ func TestFollower(t *testing.T) { assert.Equal(t, proto.ServingStatus_FOLLOWER, fc.Status()) assert.EqualValues(t, 1, fc.Term()) + // close follower + assert.NoError(t, fc.Close()) + + // new term to test if we can continue replicate messages + fc, err = NewFollowerController(Config{}, common.DefaultNamespace, shardId, walFactory, kvFactory) + assert.NoError(t, err) + assert.Equal(t, proto.ServingStatus_NOT_MEMBER, fc.Status()) + _, err = fc.NewTerm(&proto.NewTermRequest{Term: 2}) + assert.NoError(t, err) + assert.Equal(t, proto.ServingStatus_FENCED, fc.Status()) + assert.EqualValues(t, 2, fc.Term()) + truncateResp, err = fc.Truncate(&proto.TruncateRequest{ + Term: 2, + HeadEntryId: &proto.EntryId{ + Term: 1, + Offset: 0, + }, + }) + assert.NoError(t, err) + assert.EqualValues(t, 2, truncateResp.HeadEntryId.Term) + + assert.Equal(t, proto.ServingStatus_FOLLOWER, fc.Status()) + stream = newMockServerReplicateStream() + wg = common.NewWaitGroup(1) + go func() { + err := fc.Replicate(stream) + assert.ErrorIs(t, err, context.Canceled) + wg.Done() + }() + stream.AddRequest(createAddRequest(t, 2, 0, map[string]string{"a": "0", "b": "1"}, wal.InvalidOffset)) + // Wait for response + response = stream.GetResponse() + assert.Equal(t, proto.ServingStatus_FOLLOWER, fc.Status()) + assert.EqualValues(t, 0, response.Offset) + // Write next entry + stream.AddRequest(createAddRequest(t, 2, 1, map[string]string{"a": "4", "b": "5"}, wal.InvalidOffset)) + + // Wait for response + response = stream.GetResponse() + assert.EqualValues(t, 1, response.Offset) + + assert.Equal(t, proto.ServingStatus_FOLLOWER, fc.Status()) + assert.EqualValues(t, 2, fc.Term()) + + stream.AddRequest(createAddRequest(t, 2, 2, map[string]string{"a": "4", "b": "5"}, wal.InvalidOffset)) + response = stream.GetResponse() + assert.EqualValues(t, 2, response.Offset) + assert.Equal(t, proto.ServingStatus_FOLLOWER, fc.Status()) + assert.EqualValues(t, 2, fc.Term()) + // Double-check the values in the DB // Keys are not there because they were not part of the commit offset dbRes, err := fc.(*followerController).db.Get(&proto.GetRequest{