From def09e03935f91051b1ad7a61e2cac73d78b8053 Mon Sep 17 00:00:00 2001 From: Jackson Owens Date: Thu, 16 Mar 2023 11:27:16 -0400 Subject: [PATCH] storage: unify storage/fs.FS and pebble/vfs.FS The storage/fs.FS had largely the same interface as vfs.FS. The storage/fs.FS interface was intended as a temporary stepping stone to using pebble's vfs.FS interface throughout Cockroach for all filesystem access. This commit unifies the two. Epic: None Release note: None --- pkg/ccl/cliccl/ear_test.go | 2 + pkg/kv/kvserver/BUILD.bazel | 1 + pkg/kv/kvserver/client_replica_gc_test.go | 3 +- pkg/kv/kvserver/kvserverbase/BUILD.bazel | 2 +- pkg/kv/kvserver/kvserverbase/syncing_write.go | 4 +- pkg/kv/kvserver/logstore/sideload_disk.go | 3 +- pkg/kv/kvserver/replica_consistency.go | 3 +- pkg/kv/kvserver/replica_corruption.go | 3 +- pkg/kv/kvserver/replica_proposal.go | 3 +- .../kvserver/replica_sst_snapshot_storage.go | 8 +- pkg/kv/kvserver/store.go | 3 +- pkg/sql/colcontainer/BUILD.bazel | 3 +- pkg/sql/colcontainer/diskqueue.go | 14 ++-- pkg/sql/colcontainer/partitionedqueue_test.go | 19 ++--- pkg/sql/colflow/vectorized_flow.go | 3 +- pkg/sql/colflow/vectorized_flow_test.go | 5 +- pkg/sql/execinfra/BUILD.bazel | 2 +- pkg/sql/execinfra/server_config.go | 4 +- pkg/storage/bench_test.go | 2 +- pkg/storage/engine.go | 4 +- pkg/storage/engine_test.go | 25 +++---- pkg/storage/fs/fs.go | 69 ++++-------------- pkg/storage/pebble.go | 73 ++----------------- pkg/storage/temp_engine.go | 8 +- .../colcontainerutils/diskqueuecfg.go | 3 +- 25 files changed, 85 insertions(+), 184 deletions(-) diff --git a/pkg/ccl/cliccl/ear_test.go b/pkg/ccl/cliccl/ear_test.go index b29a06cc1ac5..9b2970867f4d 100644 --- a/pkg/ccl/cliccl/ear_test.go +++ b/pkg/ccl/cliccl/ear_test.go @@ -14,6 +14,7 @@ import ( "crypto/rand" "fmt" "path/filepath" + "sort" "strings" "testing" @@ -56,6 +57,7 @@ func TestDecrypt(t *testing.T) { // Find a manifest file to check. files, err := p.List(dir) require.NoError(t, err) + sort.Strings(files) var manifestPath string for _, basename := range files { if strings.HasPrefix(basename, "MANIFEST-") { diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index c244d53fe9e2..0524d6dbe822 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -212,6 +212,7 @@ go_library( "@com_github_cockroachdb_logtags//:logtags", "@com_github_cockroachdb_pebble//:pebble", "@com_github_cockroachdb_pebble//objstorage", + "@com_github_cockroachdb_pebble//vfs", "@com_github_cockroachdb_redact//:redact", "@com_github_gogo_protobuf//proto", "@com_github_google_btree//:btree", diff --git a/pkg/kv/kvserver/client_replica_gc_test.go b/pkg/kv/kvserver/client_replica_gc_test.go index 53c9e04e9d21..2b1dde8d0621 100644 --- a/pkg/kv/kvserver/client_replica_gc_test.go +++ b/pkg/kv/kvserver/client_replica_gc_test.go @@ -12,6 +12,7 @@ package kvserver_test import ( "context" + "os" "path/filepath" "testing" "time" @@ -112,7 +113,7 @@ func TestReplicaGCQueueDropReplicaDirect(t *testing.T) { if dir == "" { t.Fatal("no sideloaded directory") } - if err := eng.MkdirAll(dir); err != nil { + if err := eng.MkdirAll(dir, os.ModePerm); err != nil { t.Fatal(err) } if err := fs.WriteFile(eng, filepath.Join(dir, "i1000000.t100000"), []byte("foo")); err != nil { diff --git a/pkg/kv/kvserver/kvserverbase/BUILD.bazel b/pkg/kv/kvserver/kvserverbase/BUILD.bazel index f4a7f74ddcc7..1bb735f64459 100644 --- a/pkg/kv/kvserver/kvserverbase/BUILD.bazel +++ b/pkg/kv/kvserver/kvserverbase/BUILD.bazel @@ -21,7 +21,6 @@ go_library( "//pkg/roachpb", "//pkg/settings", "//pkg/settings/cluster", - "//pkg/storage/fs", "//pkg/util/errorutil", "//pkg/util/hlc", "//pkg/util/humanizeutil", @@ -30,6 +29,7 @@ go_library( "//pkg/util/timeutil", "//pkg/util/tracing/tracingpb", "@com_github_cockroachdb_errors//:errors", + "@com_github_cockroachdb_pebble//vfs", "@com_github_cockroachdb_redact//:redact", "@org_golang_x_time//rate", ], diff --git a/pkg/kv/kvserver/kvserverbase/syncing_write.go b/pkg/kv/kvserver/kvserverbase/syncing_write.go index d44e865394b9..6231252b9627 100644 --- a/pkg/kv/kvserver/kvserverbase/syncing_write.go +++ b/pkg/kv/kvserver/kvserverbase/syncing_write.go @@ -19,10 +19,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/storage/fs" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" + "github.com/cockroachdb/pebble/vfs" "golang.org/x/time/rate" ) @@ -82,7 +82,7 @@ func WriteFileSyncing( ctx context.Context, filename string, data []byte, - fs fs.FS, + fs vfs.FS, perm os.FileMode, settings *cluster.Settings, limiter *rate.Limiter, diff --git a/pkg/kv/kvserver/logstore/sideload_disk.go b/pkg/kv/kvserver/logstore/sideload_disk.go index 81496e0de454..e2bfc6765581 100644 --- a/pkg/kv/kvserver/logstore/sideload_disk.go +++ b/pkg/kv/kvserver/logstore/sideload_disk.go @@ -13,6 +13,7 @@ package logstore import ( "context" "fmt" + "os" "path/filepath" "strconv" "strings" @@ -75,7 +76,7 @@ func NewDiskSideloadStorage( } func (ss *DiskSideloadStorage) createDir() error { - err := ss.eng.MkdirAll(ss.dir) + err := ss.eng.MkdirAll(ss.dir, os.ModePerm) ss.dirCreated = ss.dirCreated || err == nil return err } diff --git a/pkg/kv/kvserver/replica_consistency.go b/pkg/kv/kvserver/replica_consistency.go index d4f4c9ad103b..99786884b2d4 100644 --- a/pkg/kv/kvserver/replica_consistency.go +++ b/pkg/kv/kvserver/replica_consistency.go @@ -15,6 +15,7 @@ import ( "crypto/sha512" "encoding/binary" "fmt" + "os" "sync" "time" @@ -741,7 +742,7 @@ func (r *Replica) computeChecksumPostApply( // certain of completing the check. Since we're already in a goroutine // that's about to end, just sleep for a few seconds and then terminate. auxDir := r.store.TODOEngine().GetAuxiliaryDir() - _ = r.store.TODOEngine().MkdirAll(auxDir) + _ = r.store.TODOEngine().MkdirAll(auxDir, os.ModePerm) path := base.PreventedStartupFile(auxDir) const attentionFmt = `ATTENTION: diff --git a/pkg/kv/kvserver/replica_corruption.go b/pkg/kv/kvserver/replica_corruption.go index c1d500bae05a..68fc59145fd4 100644 --- a/pkg/kv/kvserver/replica_corruption.go +++ b/pkg/kv/kvserver/replica_corruption.go @@ -13,6 +13,7 @@ package kvserver import ( "context" "fmt" + "os" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" @@ -50,7 +51,7 @@ func (r *Replica) setCorruptRaftMuLocked( r.mu.destroyStatus.Set(cErr, destroyReasonRemoved) auxDir := r.store.TODOEngine().GetAuxiliaryDir() - _ = r.store.TODOEngine().MkdirAll(auxDir) + _ = r.store.TODOEngine().MkdirAll(auxDir, os.ModePerm) path := base.PreventedStartupFile(auxDir) preventStartupMsg := fmt.Sprintf(`ATTENTION: diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index 5f8548109c45..d1496ff42d05 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -12,6 +12,7 @@ package kvserver import ( "context" + "os" "path/filepath" "time" "unsafe" @@ -561,7 +562,7 @@ func addSSTablePreApply( // TODO(tschottdorf): remove this once sideloaded storage guarantees its // existence. - if err := eng.MkdirAll(filepath.Dir(ingestPath)); err != nil { + if err := eng.MkdirAll(filepath.Dir(ingestPath), os.ModePerm); err != nil { panic(err) } if _, err := eng.Stat(ingestPath); err == nil { diff --git a/pkg/kv/kvserver/replica_sst_snapshot_storage.go b/pkg/kv/kvserver/replica_sst_snapshot_storage.go index b53a0cbf6eb3..d7ce8c6cc12f 100644 --- a/pkg/kv/kvserver/replica_sst_snapshot_storage.go +++ b/pkg/kv/kvserver/replica_sst_snapshot_storage.go @@ -13,6 +13,7 @@ package kvserver import ( "context" "fmt" + "os" "path/filepath" "strconv" @@ -24,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/objstorage" + "github.com/cockroachdb/pebble/vfs" "golang.org/x/time/rate" ) @@ -113,7 +115,7 @@ func (s *SSTSnapshotStorageScratch) filename(id int) string { } func (s *SSTSnapshotStorageScratch) createDir() error { - err := s.storage.engine.MkdirAll(s.snapDir) + err := s.storage.engine.MkdirAll(s.snapDir, os.ModePerm) s.dirCreated = s.dirCreated || err == nil return err } @@ -182,7 +184,7 @@ func (s *SSTSnapshotStorageScratch) Close() error { type SSTSnapshotStorageFile struct { scratch *SSTSnapshotStorageScratch created bool - file fs.File + file vfs.File filename string ctx context.Context bytesPerSync int64 @@ -207,7 +209,7 @@ func (f *SSTSnapshotStorageFile) ensureFile() error { } var err error if f.bytesPerSync > 0 { - f.file, err = f.scratch.storage.engine.CreateWithSync(f.filename, int(f.bytesPerSync)) + f.file, err = fs.CreateWithSync(f.scratch.storage.engine, f.filename, int(f.bytesPerSync)) } else { f.file, err = f.scratch.storage.engine.Create(f.filename) } diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 35622c3285e3..21e89bb7551b 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -15,6 +15,7 @@ import ( "context" "fmt" "math" + "os" "path/filepath" "runtime" "sort" @@ -3083,7 +3084,7 @@ func (s *Store) checkpointSpans(desc *roachpb.RangeDescriptor) []roachpb.Span { // the provided key spans. If spans is empty, it includes the entire store. func (s *Store) checkpoint(tag string, spans []roachpb.Span) (string, error) { checkpointBase := s.checkpointsDir() - _ = s.TODOEngine().MkdirAll(checkpointBase) + _ = s.TODOEngine().MkdirAll(checkpointBase, os.ModePerm) // Create the checkpoint in a "pending" directory first. If we fail midway, it // should be clear that the directory contains an incomplete checkpoint. pendingDir := filepath.Join(checkpointBase, tag+"_pending") diff --git a/pkg/sql/colcontainer/BUILD.bazel b/pkg/sql/colcontainer/BUILD.bazel index 193404fb093d..767aa256c692 100644 --- a/pkg/sql/colcontainer/BUILD.bazel +++ b/pkg/sql/colcontainer/BUILD.bazel @@ -20,6 +20,7 @@ go_library( "//pkg/util/mon", "//pkg/util/uuid", "@com_github_cockroachdb_errors//:errors", + "@com_github_cockroachdb_pebble//vfs", "@com_github_golang_snappy//:snappy", "@com_github_marusama_semaphore//:semaphore", ], @@ -46,7 +47,6 @@ go_test( "//pkg/sql/pgwire/pgcode", "//pkg/sql/pgwire/pgerror", "//pkg/sql/types", - "//pkg/storage/fs", "//pkg/testutils/colcontainerutils", "//pkg/testutils/skip", "//pkg/util/humanizeutil", @@ -54,6 +54,7 @@ go_test( "//pkg/util/log", "//pkg/util/mon", "//pkg/util/randutil", + "@com_github_cockroachdb_pebble//vfs", "@com_github_marusama_semaphore//:semaphore", "@com_github_stretchr_testify//require", ], diff --git a/pkg/sql/colcontainer/diskqueue.go b/pkg/sql/colcontainer/diskqueue.go index 5e0a000f062b..6170b45754c8 100644 --- a/pkg/sql/colcontainer/diskqueue.go +++ b/pkg/sql/colcontainer/diskqueue.go @@ -14,6 +14,7 @@ import ( "bytes" "context" "io" + "os" "path/filepath" "strconv" @@ -26,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" + "github.com/cockroachdb/pebble/vfs" "github.com/golang/snappy" ) @@ -181,7 +183,7 @@ type diskQueue struct { // written before a compress and flush. writeBufferLimit int writeFileIdx int - writeFile fs.File + writeFile vfs.File deserializerState struct { *colserde.FileDeserializer curBatch int @@ -189,7 +191,7 @@ type diskQueue struct { // readFileIdx is an index into the current file in files the deserializer is // reading from. readFileIdx int - readFile fs.File + readFile vfs.File scratchDecompressedReadBytes []byte diskAcc *mon.BoundAccount @@ -298,7 +300,7 @@ func GetPatherFunc(f func(ctx context.Context) string) GetPather { // DiskQueueCfg is a struct holding the configuration options for a DiskQueue. type DiskQueueCfg struct { // FS is the filesystem interface to use. - FS fs.FS + FS vfs.FS // GetPather returns where the temporary directory that will contain this // DiskQueue's files has been created. The directory name will be a UUID. // Note that the directory is created lazily on the first call to GetPath. @@ -411,7 +413,7 @@ func newDiskQueue( if d.cfg.CacheMode != DiskQueueCacheModeIntertwinedCalls { d.writeBufferLimit = d.cfg.BufferSizeBytes / 2 } - if err := cfg.FS.MkdirAll(filepath.Join(cfg.GetPather.GetPath(ctx), d.dirName)); err != nil { + if err := cfg.FS.MkdirAll(filepath.Join(cfg.GetPather.GetPath(ctx), d.dirName), os.ModePerm); err != nil { return nil, err } // rotateFile will create a new file to write to. @@ -492,7 +494,7 @@ func (d *diskQueue) Close(ctx context.Context) error { // to write to. func (d *diskQueue) rotateFile(ctx context.Context) error { fName := filepath.Join(d.cfg.GetPather.GetPath(ctx), d.dirName, strconv.Itoa(d.seqNo)) - f, err := d.cfg.FS.CreateWithSync(fName, bytesPerSync) + f, err := fs.CreateWithSync(d.cfg.FS, fName, bytesPerSync) if err != nil { return err } @@ -527,7 +529,7 @@ func (d *diskQueue) rotateFile(ctx context.Context) error { return nil } -func (d *diskQueue) resetWriters(f fs.File) error { +func (d *diskQueue) resetWriters(f vfs.File) error { d.writer.reset(f) return d.serializer.Reset(d.writer) } diff --git a/pkg/sql/colcontainer/partitionedqueue_test.go b/pkg/sql/colcontainer/partitionedqueue_test.go index 8538a931ab80..3efbe2d9d6de 100644 --- a/pkg/sql/colcontainer/partitionedqueue_test.go +++ b/pkg/sql/colcontainer/partitionedqueue_test.go @@ -19,16 +19,16 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/colcontainer" "github.com/cockroachdb/cockroach/pkg/sql/colexecop" "github.com/cockroachdb/cockroach/pkg/sql/types" - "github.com/cockroachdb/cockroach/pkg/storage/fs" "github.com/cockroachdb/cockroach/pkg/testutils/colcontainerutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/randutil" + "github.com/cockroachdb/pebble/vfs" "github.com/marusama/semaphore" "github.com/stretchr/testify/require" ) type fdCountingFSFile struct { - fs.File + vfs.File onCloseCb func() } @@ -41,7 +41,7 @@ func (f *fdCountingFSFile) Close() error { } type fdCountingFS struct { - fs.FS + vfs.FS writeFDs int readFDs int } @@ -58,7 +58,7 @@ func (f *fdCountingFS) assertOpenFDs( require.Equal(t, expectedReadFDs, f.readFDs) } -func (f *fdCountingFS) Create(name string) (fs.File, error) { +func (f *fdCountingFS) Create(name string) (vfs.File, error) { file, err := f.FS.Create(name) if err != nil { return nil, err @@ -67,16 +67,7 @@ func (f *fdCountingFS) Create(name string) (fs.File, error) { return &fdCountingFSFile{File: file, onCloseCb: func() { f.writeFDs-- }}, nil } -func (f *fdCountingFS) CreateWithSync(name string, bytesPerSync int) (fs.File, error) { - file, err := f.FS.CreateWithSync(name, bytesPerSync) - if err != nil { - return nil, err - } - f.writeFDs++ - return &fdCountingFSFile{File: file, onCloseCb: func() { f.writeFDs-- }}, nil -} - -func (f *fdCountingFS) Open(name string) (fs.File, error) { +func (f *fdCountingFS) Open(name string, opts ...vfs.OpenOption) (vfs.File, error) { file, err := f.FS.Open(name) if err != nil { return nil, err diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index 7100bb9b1b43..bf0988e126b9 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -12,6 +12,7 @@ package colflow import ( "context" + "os" "path/filepath" "sync" "sync/atomic" @@ -326,7 +327,7 @@ func (f *vectorizedFlow) GetPath(ctx context.Context) string { tempDirName := f.GetID().String() f.tempStorage.path = filepath.Join(f.Cfg.TempStoragePath, tempDirName) log.VEventf(ctx, 1, "flow %s spilled to disk, stack trace: %s", f.ID, util.GetSmallTrace(2)) - if err := f.Cfg.TempFS.MkdirAll(f.tempStorage.path); err != nil { + if err := f.Cfg.TempFS.MkdirAll(f.tempStorage.path, os.ModePerm); err != nil { colexecerror.InternalError(errors.Wrap(err, "unable to create temporary storage directory")) } // We have just created the temporary directory which will be used for all diff --git a/pkg/sql/colflow/vectorized_flow_test.go b/pkg/sql/colflow/vectorized_flow_test.go index 47dfc4d4af73..534da28773a7 100644 --- a/pkg/sql/colflow/vectorized_flow_test.go +++ b/pkg/sql/colflow/vectorized_flow_test.go @@ -12,6 +12,7 @@ package colflow import ( "context" + "os" "path/filepath" "sync" "testing" @@ -364,12 +365,12 @@ func TestVectorizedFlowTempDirectory(t *testing.T) { errCh := make(chan error) go func() { createTempDir(ctx) - errCh <- ngn.MkdirAll(filepath.Join(vf.GetPath(ctx), "async")) + errCh <- ngn.MkdirAll(filepath.Join(vf.GetPath(ctx), "async"), os.ModePerm) }() createTempDir(ctx) // Both goroutines should be able to create their subdirectories within the // flow's temporary directory. - require.NoError(t, ngn.MkdirAll(filepath.Join(vf.GetPath(ctx), "main_goroutine"))) + require.NoError(t, ngn.MkdirAll(filepath.Join(vf.GetPath(ctx), "main_goroutine"), os.ModePerm)) require.NoError(t, <-errCh) vf.Cleanup(ctx) checkDirs(t, 0) diff --git a/pkg/sql/execinfra/BUILD.bazel b/pkg/sql/execinfra/BUILD.bazel index 37292614e0fa..6e318108c23e 100644 --- a/pkg/sql/execinfra/BUILD.bazel +++ b/pkg/sql/execinfra/BUILD.bazel @@ -60,7 +60,6 @@ go_library( "//pkg/sql/sqlliveness", "//pkg/sql/stats", "//pkg/sql/types", - "//pkg/storage/fs", "//pkg/util/admission", "//pkg/util/buildutil", "//pkg/util/intsets", @@ -77,6 +76,7 @@ go_library( "//pkg/util/tracing/tracingpb", "//pkg/util/uuid", "@com_github_cockroachdb_errors//:errors", + "@com_github_cockroachdb_pebble//vfs", "@com_github_cockroachdb_redact//:redact", "@com_github_marusama_semaphore//:semaphore", "@io_opentelemetry_go_otel//attribute", diff --git a/pkg/sql/execinfra/server_config.go b/pkg/sql/execinfra/server_config.go index 4f907f7da134..e33fd9f3533a 100644 --- a/pkg/sql/execinfra/server_config.go +++ b/pkg/sql/execinfra/server_config.go @@ -37,7 +37,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" "github.com/cockroachdb/cockroach/pkg/sql/stats" - "github.com/cockroachdb/cockroach/pkg/storage/fs" "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/limit" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -45,6 +44,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" + "github.com/cockroachdb/pebble/vfs" "github.com/marusama/semaphore" ) @@ -95,7 +95,7 @@ type ServerConfig struct { // TempFS is used by the vectorized execution engine to store columns when the // working set is larger than can be stored in memory. - TempFS fs.FS + TempFS vfs.FS // VecFDSemaphore is a weighted semaphore that restricts the number of open // file descriptors in the vectorized engine. diff --git a/pkg/storage/bench_test.go b/pkg/storage/bench_test.go index d747464f79b6..496563bc3020 100644 --- a/pkg/storage/bench_test.go +++ b/pkg/storage/bench_test.go @@ -1927,7 +1927,7 @@ func BenchmarkMVCCScannerWithIntentsAndVersions(b *testing.B) { return cmp < 0 }) sstFileName := fmt.Sprintf("tmp-ingest-%d", i) - sstFile, err := eng.fs.Create(sstFileName) + sstFile, err := eng.Create(sstFileName) require.NoError(b, err) // No improvement with v3 since the multiple versions are in different // files. diff --git a/pkg/storage/engine.go b/pkg/storage/engine.go index d4d621a2d0df..08e5f4af6c90 100644 --- a/pkg/storage/engine.go +++ b/pkg/storage/engine.go @@ -20,7 +20,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" - "github.com/cockroachdb/cockroach/pkg/storage/fs" "github.com/cockroachdb/cockroach/pkg/storage/pebbleiter" "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -31,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble" + "github.com/cockroachdb/pebble/vfs" prometheusgo "github.com/prometheus/client_model/go" ) @@ -941,7 +941,7 @@ type Engine interface { // be invoked while holding mutexes). RegisterFlushCompletedCallback(cb func()) // Filesystem functionality. - fs.FS + vfs.FS // CreateCheckpoint creates a checkpoint of the engine in the given directory, // which must not exist. The directory should be on the same file system so // that hard links can be used. If spans is not empty, the checkpoint excludes diff --git a/pkg/storage/engine_test.go b/pkg/storage/engine_test.go index c53cb0470700..9e2de5259227 100644 --- a/pkg/storage/engine_test.go +++ b/pkg/storage/engine_test.go @@ -18,6 +18,7 @@ import ( "io" "math" "math/rand" + "os" "path/filepath" "reflect" "sort" @@ -40,6 +41,7 @@ import ( "github.com/cockroachdb/errors" "github.com/cockroachdb/errors/oserror" "github.com/cockroachdb/pebble" + "github.com/cockroachdb/pebble/vfs" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" @@ -1056,12 +1058,6 @@ func TestIngestDelayLimit(t *testing.T) { } } -type stringSorter []string - -func (s stringSorter) Len() int { return len(s) } -func (s stringSorter) Swap(i int, j int) { s[i], s[j] = s[j], s[i] } -func (s stringSorter) Less(i int, j int) bool { return strings.Compare(s[i], s[j]) < 0 } - func TestEngineFS(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -1114,7 +1110,7 @@ func TestEngineFS(t *testing.T) { "9h: delete /dir1", } - var f fs.File + var f vfs.File for _, tc := range testCases { s := strings.Split(tc, " ")[1:] @@ -1132,14 +1128,14 @@ func TestEngineFS(t *testing.T) { } var ( - g fs.File + g vfs.File err error ) switch s[0] { case "create": g, err = e.Create(s[1]) case "create-with-sync": - g, err = e.CreateWithSync(s[1], 1) + g, err = fs.CreateWithSync(e, s[1], 1) case "link": err = e.Link(s[1], s[2]) case "open": @@ -1151,13 +1147,13 @@ func TestEngineFS(t *testing.T) { case "rename": err = e.Rename(s[1], s[2]) case "create-dir": - err = e.MkdirAll(s[1]) + err = e.MkdirAll(s[1], os.ModePerm) case "list-dir": result, err := e.List(s[1]) if err != nil { break } - sort.Sort(stringSorter(result)) + sort.Strings(result) got := strings.Join(result, ",") want := s[3] if got != want { @@ -1304,6 +1300,7 @@ func TestFS(t *testing.T) { t.Helper() got, err := fs.List(dir) + sort.Strings(got) require.NoError(t, err) if !reflect.DeepEqual(got, want) { t.Fatalf("fs.List(%q) = %#v, want %#v", dir, got, want) @@ -1311,7 +1308,7 @@ func TestFS(t *testing.T) { } // Create a/ and assert that it's empty. - require.NoError(t, fs.MkdirAll(path("a"))) + require.NoError(t, fs.MkdirAll(path("a"), os.ModePerm)) expectLS(path("a"), []string{}) if _, err := fs.Stat(path("a/b/c")); !oserror.IsNotExist(err) { t.Fatal(`fs.Stat("a/b/c") should not exist`) @@ -1319,8 +1316,8 @@ func TestFS(t *testing.T) { // Create a/b/ and a/b/c/ in a single MkdirAll call. // Then ensure that a duplicate call returns a nil error. - require.NoError(t, fs.MkdirAll(path("a/b/c"))) - require.NoError(t, fs.MkdirAll(path("a/b/c"))) + require.NoError(t, fs.MkdirAll(path("a/b/c"), os.ModePerm)) + require.NoError(t, fs.MkdirAll(path("a/b/c"), os.ModePerm)) expectLS(path("a"), []string{"b"}) expectLS(path("a/b"), []string{"c"}) expectLS(path("a/b/c"), []string{}) diff --git a/pkg/storage/fs/fs.go b/pkg/storage/fs/fs.go index 3d626c2215e4..a64ad6ed54e7 100644 --- a/pkg/storage/fs/fs.go +++ b/pkg/storage/fs/fs.go @@ -12,66 +12,23 @@ package fs import ( "io" - "os" -) - -// File and FS are a partial attempt at offering the Pebble vfs.FS interface. Given the constraints -// of the RocksDB Env interface we've chosen to only include what is easy to implement. Additionally, -// it does not try to subsume all the file related functionality already in the Engine interface. -// It seems preferable to do a final cleanup only when the implementation can simply use Pebble's -// implementation of vfs.FS. At that point the following interface will become a superset of vfs.FS. -type File interface { - io.ReadWriteCloser - io.ReaderAt - Sync() error -} - -// FS provides a filesystem interface. -type FS interface { - // Create creates the named file for writing, removing the file at - // the provided path if one already exists. - Create(name string) (File, error) - - // CreateWithSync is similar to Create, but the file is periodically - // synced whenever more than bytesPerSync bytes accumulate. This syncing - // does not provide any persistency guarantees, but can prevent latency - // spikes. - CreateWithSync(name string, bytesPerSync int) (File, error) - - // Link creates newname as a hard link to the oldname file. - Link(oldname, newname string) error - - // Open opens the named file for reading. - Open(name string) (File, error) - - // OpenDir opens the named directory for syncing. - OpenDir(name string) (File, error) - // Remove removes the named file. If the file with given name doesn't - // exist, return an error that returns true from oserror.IsNotExist(). - Remove(name string) error - - // Rename renames a file. It overwrites the file at newname if one exists, - // the same as os.Rename. - Rename(oldname, newname string) error - - // MkdirAll creates the named dir and its parents. Does nothing if the - // directory already exists. - MkdirAll(name string) error - - // RemoveAll deletes the path and any children it contains. - RemoveAll(dir string) error - - // List returns a listing of the given directory. The names returned are - // relative to the directory. - List(name string) ([]string, error) + "github.com/cockroachdb/pebble/vfs" +) - // Stat returns a FileInfo describing the named file. - Stat(name string) (os.FileInfo, error) +// CreateWithSync creates a file wrapped with logic to periodically sync +// whenever more than bytesPerSync bytes accumulate. This syncing does not +// provide any persistency guarantees, but can prevent latency spikes. +func CreateWithSync(fs vfs.FS, name string, bytesPerSync int) (vfs.File, error) { + f, err := fs.Create(name) + if err != nil { + return nil, err + } + return vfs.NewSyncingFile(f, vfs.SyncingFileOptions{BytesPerSync: bytesPerSync}), nil } // WriteFile writes data to a file named by filename. -func WriteFile(fs FS, filename string, data []byte) error { +func WriteFile(fs vfs.FS, filename string, data []byte) error { f, err := fs.Create(filename) if err != nil { return err @@ -84,7 +41,7 @@ func WriteFile(fs FS, filename string, data []byte) error { } // ReadFile reads data from a file named by filename. -func ReadFile(fs FS, filename string) ([]byte, error) { +func ReadFile(fs vfs.FS, filename string) ([]byte, error) { file, err := fs.Open(filename) if err != nil { return nil, err diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go index d24762cbb81a..7894cae53d6a 100644 --- a/pkg/storage/pebble.go +++ b/pkg/storage/pebble.go @@ -19,7 +19,6 @@ import ( "math" "os" "path/filepath" - "sort" "strconv" "strings" "sync" @@ -731,6 +730,8 @@ type EncryptionStatsHandler interface { // Pebble is a wrapper around a Pebble database instance. type Pebble struct { + vfs.FS + atomic struct { // compactionConcurrency is the current compaction concurrency set on // the Pebble store. The compactionConcurrency option in the Pebble @@ -768,7 +769,6 @@ type Pebble struct { sharedBytesWritten int64 // Relevant options copied over from pebble.Options. - fs vfs.FS unencryptedFS vfs.FS logCtx context.Context logger pebble.LoggerAndTracer @@ -1012,6 +1012,7 @@ func NewPebble(ctx context.Context, cfg PebbleConfig) (p *Pebble, err error) { storeProps := computeStoreProperties(ctx, cfg.Dir, opts.ReadOnly, encryptionEnv != nil /* encryptionEnabled */) p = &Pebble{ + FS: opts.FS, readOnly: opts.ReadOnly, path: cfg.Dir, auxDir: auxDir, @@ -1023,7 +1024,6 @@ func NewPebble(ctx context.Context, cfg PebbleConfig) (p *Pebble, err error) { settings: cfg.Settings, encryption: encryptionEnv, fileRegistry: fileRegistry, - fs: opts.FS, unencryptedFS: unencryptedFS, logger: opts.LoggerAndTracer, logCtx: logCtx, @@ -1812,7 +1812,7 @@ func (p *Pebble) GetEnvStats() (*EnvStats, error) { } stats.ActiveKeyFiles++ - filename := p.fs.PathBase(filePath) + filename := p.FS.PathBase(filePath) numStr := strings.TrimSuffix(filename, ".sst") if len(numStr) == len(filename) { continue // not a sstable @@ -1909,68 +1909,7 @@ func (p *Pebble) RegisterFlushCompletedCallback(cb func()) { p.mu.Unlock() } -// Remove implements the FS interface. -func (p *Pebble) Remove(filename string) error { - return p.fs.Remove(filename) -} - -// RemoveAll implements the Engine interface. -func (p *Pebble) RemoveAll(dir string) error { - return p.fs.RemoveAll(dir) -} - -// Link implements the FS interface. -func (p *Pebble) Link(oldname, newname string) error { - return p.fs.Link(oldname, newname) -} - -var _ fs.FS = &Pebble{} - -// Create implements the FS interface. -func (p *Pebble) Create(name string) (fs.File, error) { - return p.fs.Create(name) -} - -// CreateWithSync implements the FS interface. -func (p *Pebble) CreateWithSync(name string, bytesPerSync int) (fs.File, error) { - f, err := p.fs.Create(name) - if err != nil { - return nil, err - } - return vfs.NewSyncingFile(f, vfs.SyncingFileOptions{BytesPerSync: bytesPerSync}), nil -} - -// Open implements the FS interface. -func (p *Pebble) Open(name string) (fs.File, error) { - return p.fs.Open(name) -} - -// OpenDir implements the FS interface. -func (p *Pebble) OpenDir(name string) (fs.File, error) { - return p.fs.OpenDir(name) -} - -// Rename implements the FS interface. -func (p *Pebble) Rename(oldname, newname string) error { - return p.fs.Rename(oldname, newname) -} - -// MkdirAll implements the FS interface. -func (p *Pebble) MkdirAll(name string) error { - return p.fs.MkdirAll(name, 0755) -} - -// List implements the FS interface. -func (p *Pebble) List(name string) ([]string, error) { - dirents, err := p.fs.List(name) - sort.Strings(dirents) - return dirents, err -} - -// Stat implements the FS interface. -func (p *Pebble) Stat(name string) (os.FileInfo, error) { - return p.fs.Stat(name) -} +var _ vfs.FS = &Pebble{} func checkpointSpansNote(spans []roachpb.Span) []byte { note := "CRDB spans:\n" @@ -2004,7 +1943,7 @@ func (p *Pebble) CreateCheckpoint(dir string, spans []roachpb.Span) error { // TODO(#90543, cockroachdb/pebble#2285): move spans info to Pebble manifest. if len(spans) > 0 { if err := fs.SafeWriteToFile( - p.fs, dir, p.fs.PathJoin(dir, "checkpoint.txt"), + p.FS, dir, p.FS.PathJoin(dir, "checkpoint.txt"), checkpointSpansNote(spans), ); err != nil { return err diff --git a/pkg/storage/temp_engine.go b/pkg/storage/temp_engine.go index c9545c82dc8a..acc375d37892 100644 --- a/pkg/storage/temp_engine.go +++ b/pkg/storage/temp_engine.go @@ -16,16 +16,16 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/diskmap" - "github.com/cockroachdb/cockroach/pkg/storage/fs" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/pebble" + "github.com/cockroachdb/pebble/vfs" ) // NewTempEngine creates a new engine for DistSQL processors to use when // the working set is larger than can be stored in memory. func NewTempEngine( ctx context.Context, tempStorage base.TempStorageConfig, storeSpec base.StoreSpec, -) (diskmap.Factory, fs.FS, error) { +) (diskmap.Factory, vfs.FS, error) { return NewPebbleTempEngine(ctx, tempStorage, storeSpec) } @@ -58,13 +58,13 @@ func (r *pebbleTempEngine) NewSortedDiskMultiMap() diskmap.SortedDiskMap { // when the working set is larger than can be stored in memory. func NewPebbleTempEngine( ctx context.Context, tempStorage base.TempStorageConfig, storeSpec base.StoreSpec, -) (diskmap.Factory, fs.FS, error) { +) (diskmap.Factory, vfs.FS, error) { return newPebbleTempEngine(ctx, tempStorage, storeSpec) } func newPebbleTempEngine( ctx context.Context, tempStorage base.TempStorageConfig, storeSpec base.StoreSpec, -) (*pebbleTempEngine, fs.FS, error) { +) (*pebbleTempEngine, vfs.FS, error) { var loc Location var cacheSize int64 = 128 << 20 // 128 MiB, arbitrary, but not "too big" if tempStorage.InMemory { diff --git a/pkg/testutils/colcontainerutils/diskqueuecfg.go b/pkg/testutils/colcontainerutils/diskqueuecfg.go index 488032af484a..deaabb7afc72 100644 --- a/pkg/testutils/colcontainerutils/diskqueuecfg.go +++ b/pkg/testutils/colcontainerutils/diskqueuecfg.go @@ -12,6 +12,7 @@ package colcontainerutils import ( "context" + "os" "testing" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -54,7 +55,7 @@ func NewTestingDiskQueueCfg(t testing.TB, inMem bool) (colcontainer.DiskQueueCfg } if inMem { - if err := ngn.MkdirAll(inMemDirName); err != nil { + if err := ngn.MkdirAll(inMemDirName, os.ModePerm); err != nil { t.Fatal(err) } }