Skip to content

Commit

Permalink
Merge pull request #88142 from itsbilal/range-dir-cleanup-21.2
Browse files Browse the repository at this point in the history
[release-21.2] kvserver: Clean up empty range directories after snapshots
  • Loading branch information
itsbilal authored Sep 19, 2022
2 parents d3d7933 + 520de9c commit 69252b3
Show file tree
Hide file tree
Showing 4 changed files with 184 additions and 6 deletions.
8 changes: 7 additions & 1 deletion pkg/kv/kvserver/replica_learner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,16 @@ func TestAddReplicaViaLearner(t *testing.T) {
// The happy case! \o/

blockUntilSnapshotCh := make(chan struct{})
var receivedSnap int64
blockSnapshotsCh := make(chan struct{})
knobs, ltk := makeReplicationTestKnobs()
ltk.storeKnobs.ReceiveSnapshot = func(h *kvserver.SnapshotRequest_Header) error {
close(blockUntilSnapshotCh)
if atomic.CompareAndSwapInt64(&receivedSnap, 0, 1) {
close(blockUntilSnapshotCh)
} else {
// Do nothing. We aren't interested in subsequent snapshots.
return nil
}
select {
case <-blockSnapshotsCh:
case <-time.After(10 * time.Second):
Expand Down
55 changes: 53 additions & 2 deletions pkg/kv/kvserver/replica_sst_snapshot_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/fs"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"golang.org/x/time/rate"
Expand All @@ -31,6 +32,10 @@ type SSTSnapshotStorage struct {
engine storage.Engine
limiter *rate.Limiter
dir string
mu struct {
syncutil.Mutex
rangeRefCount map[roachpb.RangeID]int
}
}

// NewSSTSnapshotStorage creates a new SST snapshot storage.
Expand All @@ -39,6 +44,10 @@ func NewSSTSnapshotStorage(engine storage.Engine, limiter *rate.Limiter) SSTSnap
engine: engine,
limiter: limiter,
dir: filepath.Join(engine.GetAuxiliaryDir(), "sstsnapshot"),
mu: struct {
syncutil.Mutex
rangeRefCount map[roachpb.RangeID]int
}{rangeRefCount: make(map[roachpb.RangeID]int)},
}
}

Expand All @@ -47,9 +56,13 @@ func NewSSTSnapshotStorage(engine storage.Engine, limiter *rate.Limiter) SSTSnap
func (s *SSTSnapshotStorage) NewScratchSpace(
rangeID roachpb.RangeID, snapUUID uuid.UUID,
) *SSTSnapshotStorageScratch {
s.mu.Lock()
s.mu.rangeRefCount[rangeID]++
s.mu.Unlock()
snapDir := filepath.Join(s.dir, strconv.Itoa(int(rangeID)), snapUUID.String())
return &SSTSnapshotStorageScratch{
storage: s,
rangeID: rangeID,
snapDir: snapDir,
}
}
Expand All @@ -59,14 +72,38 @@ func (s *SSTSnapshotStorage) Clear() error {
return s.engine.RemoveAll(s.dir)
}

// scratchClosed is called when an SSTSnapshotStorageScratch created by this
// SSTSnapshotStorage is closed. This method handles any cleanup of range
// directories if all SSTSnapshotStorageScratches corresponding to a range
// have closed.
func (s *SSTSnapshotStorage) scratchClosed(rangeID roachpb.RangeID) {
s.mu.Lock()
defer s.mu.Unlock()
val := s.mu.rangeRefCount[rangeID]
if val <= 0 {
panic("inconsistent scratch ref count")
}
val--
s.mu.rangeRefCount[rangeID] = val
if val == 0 {
delete(s.mu.rangeRefCount, rangeID)
// Suppressing an error here is okay, as orphaned directories are at worst
// a performance issue when we later walk directories in pebble.Capacity()
// but not a correctness issue.
_ = s.engine.RemoveAll(filepath.Join(s.dir, strconv.Itoa(int(rangeID))))
}
}

// SSTSnapshotStorageScratch keeps track of the SST files incrementally created
// when receiving a snapshot. Each scratch is associated with a specific
// snapshot.
type SSTSnapshotStorageScratch struct {
storage *SSTSnapshotStorage
rangeID roachpb.RangeID
ssts []string
snapDir string
dirCreated bool
closed bool
}

func (s *SSTSnapshotStorageScratch) filename(id int) string {
Expand All @@ -87,6 +124,9 @@ func (s *SSTSnapshotStorageScratch) createDir() error {
func (s *SSTSnapshotStorageScratch) NewFile(
ctx context.Context, bytesPerSync int64,
) (*SSTSnapshotStorageFile, error) {
if s.closed {
return nil, errors.AssertionFailedf("SSTSnapshotStorageScratch closed")
}
id := len(s.ssts)
filename := s.filename(id)
s.ssts = append(s.ssts, filename)
Expand All @@ -103,6 +143,9 @@ func (s *SSTSnapshotStorageScratch) NewFile(
// the provided SST when it is finished using it. If the provided SST is empty,
// then no file will be created and nothing will be written.
func (s *SSTSnapshotStorageScratch) WriteSST(ctx context.Context, data []byte) error {
if s.closed {
return errors.AssertionFailedf("SSTSnapshotStorageScratch closed")
}
if len(data) == 0 {
return nil
}
Expand All @@ -129,8 +172,13 @@ func (s *SSTSnapshotStorageScratch) SSTs() []string {
return s.ssts
}

// Clear removes the directory and SSTs created for a particular snapshot.
func (s *SSTSnapshotStorageScratch) Clear() error {
// Close removes the directory and SSTs created for a particular snapshot.
func (s *SSTSnapshotStorageScratch) Close() error {
if s.closed {
return nil
}
s.closed = true
defer s.storage.scratchClosed(s.rangeID)
return s.storage.engine.RemoveAll(s.snapDir)
}

Expand All @@ -157,6 +205,9 @@ func (f *SSTSnapshotStorageFile) ensureFile() error {
return err
}
}
if f.scratch.closed {
return errors.AssertionFailedf("SSTSnapshotStorageScratch closed")
}
var err error
if f.bytesPerSync > 0 {
f.file, err = f.scratch.storage.engine.CreateWithSync(f.filename, int(f.bytesPerSync))
Expand Down
125 changes: 123 additions & 2 deletions pkg/kv/kvserver/replica_sst_snapshot_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ package kvserver
import (
"context"
"io/ioutil"
"path/filepath"
"strconv"
"sync"
"testing"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer"
Expand All @@ -20,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/errors/oserror"
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"
Expand Down Expand Up @@ -92,12 +96,129 @@ func TestSSTSnapshotStorage(t *testing.T) {
_, err = f.Write([]byte("foo"))
require.NoError(t, err)

// Check that Clear removes the directory.
require.NoError(t, scratch.Clear())
// Check that Close removes the snapshot directory as well as the range
// directory.
require.NoError(t, scratch.Close())
_, err = eng.Stat(scratch.snapDir)
if !oserror.IsNotExist(err) {
t.Fatalf("expected %s to not exist", scratch.snapDir)
}
rangeDir := filepath.Join(sstSnapshotStorage.dir, strconv.Itoa(int(scratch.rangeID)))
_, err = eng.Stat(rangeDir)
if !oserror.IsNotExist(err) {
t.Fatalf("expected %s to not exist", rangeDir)
}
require.NoError(t, sstSnapshotStorage.Clear())
_, err = eng.Stat(sstSnapshotStorage.dir)
if !oserror.IsNotExist(err) {
t.Fatalf("expected %s to not exist", sstSnapshotStorage.dir)
}
}

func TestSSTSnapshotStorageConcurrentRange(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
testRangeID := roachpb.RangeID(1)
testSnapUUID := uuid.Must(uuid.FromBytes([]byte("foobar1234567890")))
testSnapUUID2 := uuid.Must(uuid.FromBytes([]byte("foobar2345678910")))
testLimiter := rate.NewLimiter(rate.Inf, 0)

cleanup, eng := newOnDiskEngine(t)
defer cleanup()
defer eng.Close()

sstSnapshotStorage := NewSSTSnapshotStorage(eng, testLimiter)

runForSnap := func(snapUUID uuid.UUID) error {
scratch := sstSnapshotStorage.NewScratchSpace(testRangeID, snapUUID)

// Check that the storage lazily creates the directories on first write.
_, err := eng.Stat(scratch.snapDir)
if !oserror.IsNotExist(err) {
return errors.Errorf("expected %s to not exist", scratch.snapDir)
}

f, err := scratch.NewFile(ctx, 0)
require.NoError(t, err)
defer func() {
require.NoError(t, f.Close())
}()

// Check that even though the files aren't created, they are still recorded in SSTs().
require.Equal(t, len(scratch.SSTs()), 1)

// Check that the storage lazily creates the files on write.
for _, fileName := range scratch.SSTs() {
_, err := eng.Stat(fileName)
if !oserror.IsNotExist(err) {
return errors.Errorf("expected %s to not exist", fileName)
}
}

_, err = f.Write([]byte("foo"))
require.NoError(t, err)

// After writing to files, check that they have been flushed to disk.
for _, fileName := range scratch.SSTs() {
f, err := eng.Open(fileName)
require.NoError(t, err)
data, err := ioutil.ReadAll(f)
require.NoError(t, err)
require.Equal(t, data, []byte("foo"))
require.NoError(t, f.Close())
}

// Check that closing is idempotent.
require.NoError(t, f.Close())
require.NoError(t, f.Close())

// Check that writing to a closed file is an error.
_, err = f.Write([]byte("foo"))
require.EqualError(t, err, "file has already been closed")

// Check that closing an empty file is an error.
f, err = scratch.NewFile(ctx, 0)
require.NoError(t, err)
require.EqualError(t, f.Close(), "file is empty")
_, err = f.Write([]byte("foo"))
require.NoError(t, err)

// Check that Close removes the snapshot directory.
require.NoError(t, scratch.Close())
_, err = eng.Stat(scratch.snapDir)
if !oserror.IsNotExist(err) {
return errors.Errorf("expected %s to not exist", scratch.snapDir)
}
return nil
}

var wg sync.WaitGroup
wg.Add(2)
errChan := make(chan error)
for _, snapID := range []uuid.UUID{testSnapUUID, testSnapUUID2} {
snapID := snapID
go func() {
defer wg.Done()
if err := runForSnap(snapID); err != nil {
errChan <- err
}
}()
}
wg.Wait()
select {
case err := <-errChan:
t.Fatal(err)
default:
}
// Ensure that the range directory was deleted after the scratches were
// closed.
rangeDir := filepath.Join(sstSnapshotStorage.dir, strconv.Itoa(int(testRangeID)))
_, err := eng.Stat(rangeDir)
if !oserror.IsNotExist(err) {
t.Fatalf("expected %s to not exist", rangeDir)
}
require.NoError(t, sstSnapshotStorage.Clear())
_, err = eng.Stat(sstSnapshotStorage.dir)
if !oserror.IsNotExist(err) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ func (kvSS *kvBatchSnapshotStrategy) Close(ctx context.Context) {
// A failure to clean up the storage is benign except that it will leak
// disk space (which is reclaimed on node restart). It is unexpected
// though, so log a warning.
if err := kvSS.scratch.Clear(); err != nil {
if err := kvSS.scratch.Close(); err != nil {
log.Warningf(ctx, "error closing kvBatchSnapshotStrategy: %v", err)
}
}
Expand Down

0 comments on commit 69252b3

Please sign in to comment.