Skip to content

Commit

Permalink
storage: unify storage/fs.FS and pebble/vfs.FS
Browse files Browse the repository at this point in the history
The storage/fs.FS had largely the same interface as vfs.FS. The storage/fs.FS
interface was intended as a temporary stepping stone to using pebble's vfs.FS
interface throughout Cockroach for all filesystem access. This commit unifies
the two.

Epic: None
Release note: None
  • Loading branch information
jbowens committed Mar 16, 2023
1 parent 683545c commit 78ad2a7
Show file tree
Hide file tree
Showing 25 changed files with 85 additions and 184 deletions.
2 changes: 2 additions & 0 deletions pkg/ccl/cliccl/ear_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"crypto/rand"
"fmt"
"path/filepath"
"sort"
"strings"
"testing"

Expand Down Expand Up @@ -56,6 +57,7 @@ func TestDecrypt(t *testing.T) {
// Find a manifest file to check.
files, err := p.List(dir)
require.NoError(t, err)
sort.Strings(files)
var manifestPath string
for _, basename := range files {
if strings.HasPrefix(basename, "MANIFEST-") {
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ go_library(
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_pebble//:pebble",
"@com_github_cockroachdb_pebble//objstorage",
"@com_github_cockroachdb_pebble//vfs",
"@com_github_cockroachdb_redact//:redact",
"@com_github_gogo_protobuf//proto",
"@com_github_google_btree//:btree",
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/client_replica_gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package kvserver_test

import (
"context"
"os"
"path/filepath"
"testing"
"time"
Expand Down Expand Up @@ -112,7 +113,7 @@ func TestReplicaGCQueueDropReplicaDirect(t *testing.T) {
if dir == "" {
t.Fatal("no sideloaded directory")
}
if err := eng.MkdirAll(dir); err != nil {
if err := eng.MkdirAll(dir, os.ModePerm); err != nil {
t.Fatal(err)
}
if err := fs.WriteFile(eng, filepath.Join(dir, "i1000000.t100000"), []byte("foo")); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/kvserverbase/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ go_library(
"//pkg/roachpb",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/storage/fs",
"//pkg/util/errorutil",
"//pkg/util/hlc",
"//pkg/util/humanizeutil",
Expand All @@ -30,6 +29,7 @@ go_library(
"//pkg/util/timeutil",
"//pkg/util/tracing/tracingpb",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_pebble//vfs",
"@com_github_cockroachdb_redact//:redact",
"@org_golang_x_time//rate",
],
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/kvserverbase/syncing_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ import (

"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage/fs"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/vfs"
"golang.org/x/time/rate"
)

Expand Down Expand Up @@ -82,7 +82,7 @@ func WriteFileSyncing(
ctx context.Context,
filename string,
data []byte,
fs fs.FS,
fs vfs.FS,
perm os.FileMode,
settings *cluster.Settings,
limiter *rate.Limiter,
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/logstore/sideload_disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package logstore
import (
"context"
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
Expand Down Expand Up @@ -75,7 +76,7 @@ func NewDiskSideloadStorage(
}

func (ss *DiskSideloadStorage) createDir() error {
err := ss.eng.MkdirAll(ss.dir)
err := ss.eng.MkdirAll(ss.dir, os.ModePerm)
ss.dirCreated = ss.dirCreated || err == nil
return err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/replica_consistency.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"crypto/sha512"
"encoding/binary"
"fmt"
"os"
"sync"
"time"

Expand Down Expand Up @@ -741,7 +742,7 @@ func (r *Replica) computeChecksumPostApply(
// certain of completing the check. Since we're already in a goroutine
// that's about to end, just sleep for a few seconds and then terminate.
auxDir := r.store.TODOEngine().GetAuxiliaryDir()
_ = r.store.TODOEngine().MkdirAll(auxDir)
_ = r.store.TODOEngine().MkdirAll(auxDir, os.ModePerm)
path := base.PreventedStartupFile(auxDir)

const attentionFmt = `ATTENTION:
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/replica_corruption.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package kvserver
import (
"context"
"fmt"
"os"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
Expand Down Expand Up @@ -50,7 +51,7 @@ func (r *Replica) setCorruptRaftMuLocked(
r.mu.destroyStatus.Set(cErr, destroyReasonRemoved)

auxDir := r.store.TODOEngine().GetAuxiliaryDir()
_ = r.store.TODOEngine().MkdirAll(auxDir)
_ = r.store.TODOEngine().MkdirAll(auxDir, os.ModePerm)
path := base.PreventedStartupFile(auxDir)

preventStartupMsg := fmt.Sprintf(`ATTENTION:
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package kvserver

import (
"context"
"os"
"path/filepath"
"time"
"unsafe"
Expand Down Expand Up @@ -559,7 +560,7 @@ func addSSTablePreApply(

// TODO(tschottdorf): remove this once sideloaded storage guarantees its
// existence.
if err := eng.MkdirAll(filepath.Dir(ingestPath)); err != nil {
if err := eng.MkdirAll(filepath.Dir(ingestPath), os.ModePerm); err != nil {
panic(err)
}
if _, err := eng.Stat(ingestPath); err == nil {
Expand Down
8 changes: 5 additions & 3 deletions pkg/kv/kvserver/replica_sst_snapshot_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package kvserver
import (
"context"
"fmt"
"os"
"path/filepath"
"strconv"

Expand All @@ -24,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/objstorage"
"github.com/cockroachdb/pebble/vfs"
"golang.org/x/time/rate"
)

Expand Down Expand Up @@ -113,7 +115,7 @@ func (s *SSTSnapshotStorageScratch) filename(id int) string {
}

func (s *SSTSnapshotStorageScratch) createDir() error {
err := s.storage.engine.MkdirAll(s.snapDir)
err := s.storage.engine.MkdirAll(s.snapDir, os.ModePerm)
s.dirCreated = s.dirCreated || err == nil
return err
}
Expand Down Expand Up @@ -182,7 +184,7 @@ func (s *SSTSnapshotStorageScratch) Close() error {
type SSTSnapshotStorageFile struct {
scratch *SSTSnapshotStorageScratch
created bool
file fs.File
file vfs.File
filename string
ctx context.Context
bytesPerSync int64
Expand All @@ -207,7 +209,7 @@ func (f *SSTSnapshotStorageFile) ensureFile() error {
}
var err error
if f.bytesPerSync > 0 {
f.file, err = f.scratch.storage.engine.CreateWithSync(f.filename, int(f.bytesPerSync))
f.file, err = fs.CreateWithSync(f.scratch.storage.engine, f.filename, int(f.bytesPerSync))
} else {
f.file, err = f.scratch.storage.engine.Create(f.filename)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"context"
"fmt"
"math"
"os"
"path/filepath"
"runtime"
"sort"
Expand Down Expand Up @@ -3069,7 +3070,7 @@ func (s *Store) checkpointSpans(desc *roachpb.RangeDescriptor) []roachpb.Span {
// the provided key spans. If spans is empty, it includes the entire store.
func (s *Store) checkpoint(tag string, spans []roachpb.Span) (string, error) {
checkpointBase := s.checkpointsDir()
_ = s.TODOEngine().MkdirAll(checkpointBase)
_ = s.TODOEngine().MkdirAll(checkpointBase, os.ModePerm)
checkpointDir := filepath.Join(checkpointBase, tag)
if err := s.TODOEngine().CreateCheckpoint(checkpointDir, spans); err != nil {
return "", err
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/colcontainer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"//pkg/util/mon",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_pebble//vfs",
"@com_github_golang_snappy//:snappy",
"@com_github_marusama_semaphore//:semaphore",
],
Expand All @@ -46,14 +47,14 @@ go_test(
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/types",
"//pkg/storage/fs",
"//pkg/testutils/colcontainerutils",
"//pkg/testutils/skip",
"//pkg/util/humanizeutil",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/mon",
"//pkg/util/randutil",
"@com_github_cockroachdb_pebble//vfs",
"@com_github_marusama_semaphore//:semaphore",
"@com_github_stretchr_testify//require",
],
Expand Down
14 changes: 8 additions & 6 deletions pkg/sql/colcontainer/diskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"bytes"
"context"
"io"
"os"
"path/filepath"
"strconv"

Expand All @@ -26,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/vfs"
"github.com/golang/snappy"
)

Expand Down Expand Up @@ -181,15 +183,15 @@ type diskQueue struct {
// written before a compress and flush.
writeBufferLimit int
writeFileIdx int
writeFile fs.File
writeFile vfs.File
deserializerState struct {
*colserde.FileDeserializer
curBatch int
}
// readFileIdx is an index into the current file in files the deserializer is
// reading from.
readFileIdx int
readFile fs.File
readFile vfs.File
scratchDecompressedReadBytes []byte

diskAcc *mon.BoundAccount
Expand Down Expand Up @@ -298,7 +300,7 @@ func GetPatherFunc(f func(ctx context.Context) string) GetPather {
// DiskQueueCfg is a struct holding the configuration options for a DiskQueue.
type DiskQueueCfg struct {
// FS is the filesystem interface to use.
FS fs.FS
FS vfs.FS
// GetPather returns where the temporary directory that will contain this
// DiskQueue's files has been created. The directory name will be a UUID.
// Note that the directory is created lazily on the first call to GetPath.
Expand Down Expand Up @@ -411,7 +413,7 @@ func newDiskQueue(
if d.cfg.CacheMode != DiskQueueCacheModeIntertwinedCalls {
d.writeBufferLimit = d.cfg.BufferSizeBytes / 2
}
if err := cfg.FS.MkdirAll(filepath.Join(cfg.GetPather.GetPath(ctx), d.dirName)); err != nil {
if err := cfg.FS.MkdirAll(filepath.Join(cfg.GetPather.GetPath(ctx), d.dirName), os.ModePerm); err != nil {
return nil, err
}
// rotateFile will create a new file to write to.
Expand Down Expand Up @@ -492,7 +494,7 @@ func (d *diskQueue) Close(ctx context.Context) error {
// to write to.
func (d *diskQueue) rotateFile(ctx context.Context) error {
fName := filepath.Join(d.cfg.GetPather.GetPath(ctx), d.dirName, strconv.Itoa(d.seqNo))
f, err := d.cfg.FS.CreateWithSync(fName, bytesPerSync)
f, err := fs.CreateWithSync(d.cfg.FS, fName, bytesPerSync)
if err != nil {
return err
}
Expand Down Expand Up @@ -527,7 +529,7 @@ func (d *diskQueue) rotateFile(ctx context.Context) error {
return nil
}

func (d *diskQueue) resetWriters(f fs.File) error {
func (d *diskQueue) resetWriters(f vfs.File) error {
d.writer.reset(f)
return d.serializer.Reset(d.writer)
}
Expand Down
19 changes: 5 additions & 14 deletions pkg/sql/colcontainer/partitionedqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/colcontainer"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/storage/fs"
"github.com/cockroachdb/cockroach/pkg/testutils/colcontainerutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/pebble/vfs"
"github.com/marusama/semaphore"
"github.com/stretchr/testify/require"
)

type fdCountingFSFile struct {
fs.File
vfs.File
onCloseCb func()
}

Expand All @@ -41,7 +41,7 @@ func (f *fdCountingFSFile) Close() error {
}

type fdCountingFS struct {
fs.FS
vfs.FS
writeFDs int
readFDs int
}
Expand All @@ -58,7 +58,7 @@ func (f *fdCountingFS) assertOpenFDs(
require.Equal(t, expectedReadFDs, f.readFDs)
}

func (f *fdCountingFS) Create(name string) (fs.File, error) {
func (f *fdCountingFS) Create(name string) (vfs.File, error) {
file, err := f.FS.Create(name)
if err != nil {
return nil, err
Expand All @@ -67,16 +67,7 @@ func (f *fdCountingFS) Create(name string) (fs.File, error) {
return &fdCountingFSFile{File: file, onCloseCb: func() { f.writeFDs-- }}, nil
}

func (f *fdCountingFS) CreateWithSync(name string, bytesPerSync int) (fs.File, error) {
file, err := f.FS.CreateWithSync(name, bytesPerSync)
if err != nil {
return nil, err
}
f.writeFDs++
return &fdCountingFSFile{File: file, onCloseCb: func() { f.writeFDs-- }}, nil
}

func (f *fdCountingFS) Open(name string) (fs.File, error) {
func (f *fdCountingFS) Open(name string, opts ...vfs.OpenOption) (vfs.File, error) {
file, err := f.FS.Open(name)
if err != nil {
return nil, err
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package colflow

import (
"context"
"os"
"path/filepath"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -326,7 +327,7 @@ func (f *vectorizedFlow) GetPath(ctx context.Context) string {
tempDirName := f.GetID().String()
f.tempStorage.path = filepath.Join(f.Cfg.TempStoragePath, tempDirName)
log.VEventf(ctx, 1, "flow %s spilled to disk, stack trace: %s", f.ID, util.GetSmallTrace(2))
if err := f.Cfg.TempFS.MkdirAll(f.tempStorage.path); err != nil {
if err := f.Cfg.TempFS.MkdirAll(f.tempStorage.path, os.ModePerm); err != nil {
colexecerror.InternalError(errors.Wrap(err, "unable to create temporary storage directory"))
}
// We have just created the temporary directory which will be used for all
Expand Down
5 changes: 3 additions & 2 deletions pkg/sql/colflow/vectorized_flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package colflow

import (
"context"
"os"
"path/filepath"
"sync"
"testing"
Expand Down Expand Up @@ -364,12 +365,12 @@ func TestVectorizedFlowTempDirectory(t *testing.T) {
errCh := make(chan error)
go func() {
createTempDir(ctx)
errCh <- ngn.MkdirAll(filepath.Join(vf.GetPath(ctx), "async"))
errCh <- ngn.MkdirAll(filepath.Join(vf.GetPath(ctx), "async"), os.ModePerm)
}()
createTempDir(ctx)
// Both goroutines should be able to create their subdirectories within the
// flow's temporary directory.
require.NoError(t, ngn.MkdirAll(filepath.Join(vf.GetPath(ctx), "main_goroutine")))
require.NoError(t, ngn.MkdirAll(filepath.Join(vf.GetPath(ctx), "main_goroutine"), os.ModePerm))
require.NoError(t, <-errCh)
vf.Cleanup(ctx)
checkDirs(t, 0)
Expand Down
Loading

0 comments on commit 78ad2a7

Please sign in to comment.