Skip to content

Commit

Permalink
raft,kvserver: add interface for log snapshot
Browse files Browse the repository at this point in the history
Epic: none
Release note: none
  • Loading branch information
pav-kv committed Sep 19, 2024
1 parent 28bf89b commit f41ae68
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 12 deletions.
21 changes: 21 additions & 0 deletions pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,27 @@ func (r *Replica) GetFirstIndex() kvpb.RaftIndex {
return r.raftFirstIndexRLocked()
}

// LogSnapshot returns an immutable point-in-time snapshot of the log storage.
func (r *replicaRaftStorage) LogSnapshot() raft.LogStorageRO {
r.raftMu.AssertHeld()
r.mu.AssertRHeld()
// TODO(pav-kv): return a wrapper which, in all methods, checks that the log
// storage hasn't been written to. A more relaxed version of it should assert
// that only the relevant part of the log hasn't been overwritten, e.g. a new
// term leader hasn't appended a log slice that truncated the log, or the log
// hasn't been wiped.
//
// This would require auditing and integrating with the write paths. Today,
// this type implements only reads, and writes are in various places like the
// logstore.LogStore type, or the code in the split handler which creates an
// empty range state.
//
// We don't need a fully fledged Pebble snapshot here. For our purposes, we
// can also make sure that raftMu is held for the entire period of using the
// LogSnapshot - this should guarantee its immutability.
return r
}

// GetLeaseAppliedIndex returns the lease index of the last applied command.
func (r *Replica) GetLeaseAppliedIndex() kvpb.LeaseAppliedIndex {
r.mu.RLock()
Expand Down
10 changes: 5 additions & 5 deletions pkg/raft/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type LogSnapshot struct {
// first is the first available log index.
first uint64
// storage contains the stable log entries.
storage LogStorage
storage LogStorageRO
// unstable contains the unstable log entries.
unstable logSlice
// logger gives access to logging errors.
Expand Down Expand Up @@ -443,7 +443,7 @@ func (l *raftLog) lastEntryID() entryID {
}

func (l *raftLog) term(i uint64) (uint64, error) {
return l.snap().term(i)
return l.snap(l.storage).term(i)
}

// term returns the term of the log entry at the given index.
Expand Down Expand Up @@ -555,7 +555,7 @@ func (l *raftLog) scan(lo, hi uint64, pageSize entryEncodingSize, v func([]pb.En
// The returned slice can be appended to, but the entries in it must not be
// mutated.
func (l *raftLog) slice(lo, hi uint64, maxSize entryEncodingSize) ([]pb.Entry, error) {
return l.snap().slice(lo, hi, maxSize)
return l.snap(l.storage).slice(lo, hi, maxSize)
}

// LogSlice returns a valid log slice for a prefix of the (lo, hi] log index
Expand Down Expand Up @@ -667,10 +667,10 @@ func (l *raftLog) zeroTermOnOutOfBounds(t uint64, err error) uint64 {

// snap returns a point-in-time snapshot of the raft log. This snapshot can be
// read from while the underlying storage is not mutated.
func (l *raftLog) snap() LogSnapshot {
func (l *raftLog) snap(storage LogStorageRO) LogSnapshot {
return LogSnapshot{
first: l.firstIndex(),
storage: l.storage, // TODO(pav-kv): take a log storage "snapshot".
storage: storage,
unstable: l.unstable.logSlice,
logger: l.logger,
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/raft/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,7 @@ func TestIsOutOfBounds(t *testing.T) {
require.True(t, tt.wpanic)
}
}()
err := l.snap().mustCheckOutOfBounds(tt.lo, tt.hi)
err := l.snap(l.storage).mustCheckOutOfBounds(tt.lo, tt.hi)
require.False(t, tt.wpanic)
require.False(t, tt.wErrCompacted && err != ErrCompacted)
require.False(t, !tt.wErrCompacted && err != nil)
Expand Down
2 changes: 1 addition & 1 deletion pkg/raft/rawnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (rn *RawNode) Step(m pb.Message) error {
// while the snapshot is being used. Typically, this means that the application
// does not run a Ready() handling cycle until the snapshot is released.
func (rn *RawNode) LogSnapshot() LogSnapshot {
return rn.raft.raftLog.snap()
return rn.raft.raftLog.snap(rn.raft.raftLog.storage.LogSnapshot())
}

// Ready returns the outstanding work that the application needs to handle. This
Expand Down
28 changes: 23 additions & 5 deletions pkg/raft/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,8 @@ var ErrSnapOutOfDate = errors.New("requested index is older than the existing sn
// are unavailable.
var ErrUnavailable = errors.New("requested entry at index is unavailable")

// LogStorage is a read API for the raft log.
type LogStorage interface {
// InitialState returns the saved HardState and ConfState information.
InitialState() (pb.HardState, pb.ConfState, error)

// LogStorageRO is a read-only API for the raft log.
type LogStorageRO interface {
// Entries returns a slice of consecutive log entries in the range [lo, hi),
// starting from lo. The maxSize limits the total size of the log entries
// returned, but Entries returns at least one entry if any.
Expand Down Expand Up @@ -88,6 +85,14 @@ type LogStorage interface {
FirstIndex() (uint64, error)
}

// LogStorage is a read API for the raft log.
type LogStorage interface {
LogStorageRO

// LogSnapshot returns an immutable point-in-time log storage snapshot.
LogSnapshot() LogStorageRO
}

// StateStorage provides read access to the state machine storage.
type StateStorage interface {
// Snapshot returns the most recent state machine snapshot.
Expand All @@ -102,6 +107,13 @@ type StateStorage interface {
//
// TODO(pav-kv): audit all error handling and document the contract.
type Storage interface {
// InitialState returns the saved HardState and ConfState information.
//
// TODO(sep-raft-log): this would need to be fetched (fully or partially) from
// both log and state machine storage on startup, to detect which of the two
// storages is ahead, and initialize correctly.
InitialState() (pb.HardState, pb.ConfState, error)

LogStorage
StateStorage
}
Expand Down Expand Up @@ -211,6 +223,12 @@ func (ms *MemoryStorage) firstIndex() uint64 {
return ms.ents[0].Index + 1
}

// LogSnapshot implements the LogStorage interface.
func (ms *MemoryStorage) LogSnapshot() LogStorageRO {
// TODO(pav-kv): return an immutable subset of MemoryStorage.
return ms
}

// Snapshot implements the Storage interface.
func (ms *MemoryStorage) Snapshot() (pb.Snapshot, error) {
ms.Lock()
Expand Down

0 comments on commit f41ae68

Please sign in to comment.