Skip to content

Commit

Permalink
Fixed reopening of db in follower after a failed snapshot request (#349)
Browse files Browse the repository at this point in the history
When a send snapshot fails (eg: wrong term sent by the leader), the
follower is left with the database closed and subsequent `NewTerm()`
request is causing a panic for `nil` pointer.

```
{"level":"warn","component":"follower-controller","namespace":"matteo","shard":5,"term":9,"error":"rpc error: code = Code(101) desc = oxia: invalid term","time":"2023-06-10T19:13:06.66252744Z","message":"Error in handle Replicate stream"}
{"level":"warn","component":"internal-rpc-server","error":"rpc error: code = Code(101) desc = oxia: invalid term","namespace":"matteo","shard":5,"peer":"10.48.4.251:50338","time":"2023-06-10T19:13:06.662638529Z","message":"SendSnapshot failed"}

{"level":"info","component":"internal-rpc-server","req":{"namespace":"matteo","shardId":"5","term":"10"},"peer":"10.48.5.53:58010","time":"2023-06-10T19:13:31.823525387Z","message":"Received NewTerm request"}
{"level":"info","component":"internal-rpc-server","req":{"namespace":"matteo","shardId":"5","term":"10"},"peer":"10.48.5.53:58010","followerTerm":9,"time":"2023-06-10T19:13:31.823551459Z","message":"Found follower, initiating new term"}
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x60 pc=0x1a51c2d]

goroutine 189 [running]:
oxia/server.(*followerController).NewTerm(0xc00068d6c0, 0xc000aff630)
	/src/oxia/server/follower_controller.go:263 +0x1ed
oxia/server.(*internalRpcServer).NewTerm(0xc00046bd60, {0x23912d8, 0xc000a1a6f0}, 0xc000aff630)
	/src/oxia/server/internal_rpc_server.go:110 +0x634
oxia/proto._OxiaCoordination_NewTerm_Handler.func1({0x23912d8, 0xc000a1a6f0}, {0x1eddd00?, 0xc000aff630})
	/src/oxia/proto/replication_grpc.pb.go:207 +0x78
github.com/grpc-ecosystem/go-grpc-prometheus.(*ServerMetrics).UnaryServerInterceptor.func1({0x23912d8, 0xc000a1a6f0}, {0x1eddd00, 0xc000aff630}, 0xc000830a20?, 0xc0004240c0)
	/go/pkg/mod/github.com/grpc-ecosystem/[email protected]/server_metrics.go:107 +0x87
oxia/proto._OxiaCoordination_NewTerm_Handler({0x1f3ae00?, 0xc00046bd60}, {0x23912d8, 0xc000a1a6f0}, 0xc00035a930, 0xc000099050)
	/src/oxia/proto/replication_grpc.pb.go:209 +0x138
google.golang.org/grpc.(*Server).processUnaryRPC(0xc0000e4000, {0x2399778, 0xc0005804e0}, 0xc0000b8120, 0xc000327c50, 0x329e820, 0x0)
	/go/pkg/mod/google.golang.org/[email protected]/server.go:1345 +0xdf0
google.golang.org/grpc.(*Server).handleStream(0xc0000e4000, {0x2399778, 0xc0005804e0}, 0xc0000b8120, 0x0)
	/go/pkg/mod/google.golang.org/[email protected]/server.go:1722 +0xa2f
google.golang.org/grpc.(*Server).serveStreams.func1.2()
	/go/pkg/mod/google.golang.org/[email protected]/server.go:966 +0x98
created by google.golang.org/grpc.(*Server).serveStreams.func1
	/go/pkg/mod/google.golang.org/[email protected]/server.go:964 +0x28a

```
  • Loading branch information
merlimat authored Jun 12, 2023
1 parent b02c989 commit 4205465
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 0 deletions.
7 changes: 7 additions & 0 deletions server/follower_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,13 @@ func (fc *followerController) NewTerm(req *proto.NewTermRequest) (*proto.NewTerm
return nil, common.ErrorInvalidStatus
}

if fc.db == nil {
var err error
if fc.db, err = kv.NewDB(fc.namespace, fc.shardId, fc.kvFactory, fc.config.NotificationsRetentionTime, common.SystemClock); err != nil {
return nil, errors.Wrapf(err, "failed to reopen database")
}
}

if err := fc.db.UpdateTerm(req.Term); err != nil {
return nil, err
}
Expand Down
71 changes: 71 additions & 0 deletions server/follower_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -888,6 +888,77 @@ func TestFollower_GetStatus(t *testing.T) {
assert.NoError(t, walFactory.Close())
}

func TestFollower_HandleSnapshotWithWrongTerm(t *testing.T) {
var shardId int64
kvFactory, err := kv.NewPebbleKVFactory(&kv.KVFactoryOptions{
DataDir: t.TempDir(),
})
assert.NoError(t, err)
walFactory := wal.NewInMemoryWalFactory()

fc, err := NewFollowerController(Config{}, common.DefaultNamespace, shardId, walFactory, kvFactory)
assert.NoError(t, err)

_, err = fc.NewTerm(&proto.NewTermRequest{Term: 1})
assert.NoError(t, err)
assert.Equal(t, proto.ServingStatus_FENCED, fc.Status())
assert.EqualValues(t, 1, fc.Term())

stream := newMockServerReplicateStream()
go func() { assert.NoError(t, fc.Replicate(stream)) }()

stream.AddRequest(createAddRequest(t, 1, 0, map[string]string{"a": "0", "b": "1"}, 0))

// Wait for acks
r1 := stream.GetResponse()
assert.Equal(t, proto.ServingStatus_FOLLOWER, fc.Status())
assert.EqualValues(t, 0, r1.Offset)
close(stream.requests)

// Load snapshot into follower
snapshot := prepareTestDb(t)

snapshotStream := newMockServerSendSnapshotStream()

wg := common.NewWaitGroup(1)

go func() {
err := fc.SendSnapshot(snapshotStream)
if err != nil {
wg.Fail(err)
} else {
wg.Done()
}
}()

for ; snapshot.Valid(); snapshot.Next() {
chunk, err := snapshot.Chunk()
assert.NoError(t, err)
content := chunk.Content()
snapshotStream.AddChunk(&proto.SnapshotChunk{
Term: 2,
Name: chunk.Name(),
Content: content,
ChunkIndex: chunk.Index(),
ChunkCount: chunk.TotalCount(),
})
}

close(snapshotStream.chunks)

// The snapshot sending should fail because the term is invalid
assert.ErrorIs(t, common.ErrorInvalidTerm, wg.Wait(context.Background()))

_, err = fc.NewTerm(&proto.NewTermRequest{Term: 5})
assert.NoError(t, err)
assert.Equal(t, proto.ServingStatus_FENCED, fc.Status())
assert.EqualValues(t, 5, fc.Term())

assert.NoError(t, fc.Close())
assert.NoError(t, kvFactory.Close())
assert.NoError(t, walFactory.Close())
}

func closeChanIsNotNil(fc FollowerController) func() bool {
return func() bool {
_fc := fc.(*followerController)
Expand Down

0 comments on commit 4205465

Please sign in to comment.