Skip to content

Commit

Permalink
storage: don't modify the given cfg.Opts
Browse files Browse the repository at this point in the history
This change improves the `NewPebble` code to not modify the given
`cfg.Opts`. Such behavior is surprising and can trip up tests that
reuse the same config.

Also, `ResolveEncryptedEnvOptions` and `wrapFilesystemMiddleware` no
longer modify the Options directly; and `CheckNoRegistryFile` is now a
standalone function.

Release note: None
Epic: none
  • Loading branch information
RaduBerinde committed Feb 8, 2023
1 parent 7ee61fb commit f314232
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 77 deletions.
15 changes: 6 additions & 9 deletions pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -1532,17 +1532,14 @@ func pebbleCryptoInitializer() error {
}
}

cfg := storage.PebbleConfig{
StorageConfig: storageConfig,
Opts: storage.DefaultPebbleOptions(),
}

// This has the side effect of storing the encrypted FS into cfg.Opts.FS.
_, _, err := storage.ResolveEncryptedEnvOptions(&cfg)
_, encryptedEnv, err := storage.ResolveEncryptedEnvOptions(&storageConfig, vfs.Default, false /* readOnly */)
if err != nil {
return err
}

pebbleToolFS.set(cfg.Opts.FS)
if encryptedEnv != nil {
pebbleToolFS.set(encryptedEnv.FS)
} else {
pebbleToolFS.set(vfs.Default)
}
return nil
}
117 changes: 59 additions & 58 deletions pkg/storage/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,10 +619,9 @@ func shortAttributeExtractorForValues(
}

// wrapFilesystemMiddleware wraps the Option's vfs.FS with disk-health checking
// and ENOSPC detection. It mutates the provided options to set the FS and
// returns a Closer that should be invoked when the filesystem will no longer be
// used.
func wrapFilesystemMiddleware(opts *pebble.Options) io.Closer {
// and ENOSPC detection. Returns the new FS and a Closer that should be invoked
// when the filesystem will no longer be used.
func wrapFilesystemMiddleware(opts *pebble.Options) (vfs.FS, io.Closer) {
// Set disk-health check interval to min(5s, maxSyncDurationDefault). This
// is mostly to ease testing; the default of 5s is too infrequent to test
// conveniently. See the disk-stalled roachtest for an example of how this
Expand All @@ -634,8 +633,7 @@ func wrapFilesystemMiddleware(opts *pebble.Options) io.Closer {
// Instantiate a file system with disk health checking enabled. This FS
// wraps the filesystem with a layer that times all write-oriented
// operations.
var closer io.Closer
opts.FS, closer = vfs.WithDiskHealthChecks(opts.FS, diskHealthCheckInterval,
fs, closer := vfs.WithDiskHealthChecks(opts.FS, diskHealthCheckInterval,
func(name string, opType vfs.OpType, duration time.Duration) {
opts.EventListener.DiskSlow(pebble.DiskSlowInfo{
Path: name,
Expand All @@ -644,10 +642,10 @@ func wrapFilesystemMiddleware(opts *pebble.Options) io.Closer {
})
})
// If we encounter ENOSPC, exit with an informative exit code.
opts.FS = vfs.OnDiskFull(opts.FS, func() {
fs = vfs.OnDiskFull(fs, func() {
exit.WithCode(exit.DiskFull())
})
return closer
return fs, closer
}

type pebbleLogger struct {
Expand Down Expand Up @@ -812,21 +810,23 @@ func (p *Pebble) SetStoreID(ctx context.Context, storeID int32) {
p.storeIDPebbleLog.Set(ctx, storeID)
}

// ResolveEncryptedEnvOptions fills in cfg.Opts.FS with an encrypted vfs if this
// store has encryption-at-rest enabled. Also returns the associated file
// registry and EncryptionStatsHandler.
func ResolveEncryptedEnvOptions(cfg *PebbleConfig) (*PebbleFileRegistry, *EncryptionEnv, error) {
fileRegistry := &PebbleFileRegistry{FS: cfg.Opts.FS, DBDir: cfg.Dir, ReadOnly: cfg.Opts.ReadOnly}
// ResolveEncryptedEnvOptions creates the EncryptionEnv and associated file
// registry if this store has encryption-at-rest enabled; otherwise returns a
// nil EncryptionEnv.
func ResolveEncryptedEnvOptions(
cfg *base.StorageConfig, fs vfs.FS, readOnly bool,
) (*PebbleFileRegistry, *EncryptionEnv, error) {
var fileRegistry *PebbleFileRegistry
if cfg.UseFileRegistry {
fileRegistry = &PebbleFileRegistry{FS: fs, DBDir: cfg.Dir, ReadOnly: readOnly}
if err := fileRegistry.Load(); err != nil {
return nil, nil, err
}
} else {
if err := fileRegistry.CheckNoRegistryFile(); err != nil {
if err := CheckNoRegistryFile(fs, cfg.Dir); err != nil {
return nil, nil, fmt.Errorf("encryption was used on this store before, but no encryption flags " +
"specified. You need a CCL build and must fully specify the --enterprise-encryption flag")
}
fileRegistry = nil
}

var env *EncryptionEnv
Expand All @@ -840,37 +840,40 @@ func ResolveEncryptedEnvOptions(cfg *PebbleConfig) (*PebbleFileRegistry, *Encryp
}
var err error
env, err = NewEncryptedEnvFunc(
cfg.Opts.FS,
fs,
fileRegistry,
cfg.Dir,
cfg.Opts.ReadOnly,
readOnly,
cfg.EncryptionOptions,
)
if err != nil {
return nil, nil, err
}
// TODO(jackson): Should this just return an EncryptionEnv,
// rather than mutating cfg.Opts?
cfg.Opts.FS = env.FS
}
return fileRegistry, env, nil
}

// NewPebble creates a new Pebble instance, at the specified path.
func NewPebble(ctx context.Context, cfg PebbleConfig) (p *Pebble, err error) {
// pebble.Open also calls EnsureDefaults, but only after doing a clone. Call
// EnsureDefaults beforehand so we have a matching cfg here for when we save
// cfg.FS and cfg.ReadOnly later on.
if cfg.Opts == nil {
cfg.Opts = DefaultPebbleOptions()
}
if cfg.Settings == nil {
return nil, errors.AssertionFailedf("NewPebble requires cfg.Settings to be set")
}

// Initialize the FS, wrapping it with disk health-checking and
// ENOSPC-detection.
filesystemCloser := wrapFilesystemMiddleware(cfg.Opts)
var opts *pebble.Options
if cfg.Opts == nil {
opts = DefaultPebbleOptions()
} else {
// Clone the given options so that we are free to modify them.
opts = cfg.Opts.Clone()
}

// pebble.Open also calls EnsureDefaults, but only after doing a clone. Call
// EnsureDefaults here to make sure we have a working FS.
opts.EnsureDefaults()

// Wrap the FS with disk health-checking and ENOSPC-detection.
var filesystemCloser io.Closer
opts.FS, filesystemCloser = wrapFilesystemMiddleware(opts)
defer func() {
if err != nil {
filesystemCloser.Close()
Expand All @@ -886,42 +889,40 @@ func NewPebble(ctx context.Context, cfg PebbleConfig) (p *Pebble, err error) {
logCtx = logtags.AddTag(logCtx, "s", storeIDContainer)
logCtx = logtags.AddTag(logCtx, "pebble", nil)

cfg.Opts.EnsureDefaults()
cfg.Opts.ErrorIfNotExists = cfg.MustExist
cfg.Opts.WALMinSyncInterval = func() time.Duration {
opts.ErrorIfNotExists = cfg.MustExist
opts.WALMinSyncInterval = func() time.Duration {
return minWALSyncInterval.Get(&cfg.Settings.SV)
}
cfg.Opts.Experimental.EnableValueBlocks = func() bool {
opts.Experimental.EnableValueBlocks = func() bool {
version := cfg.Settings.Version.ActiveVersionOrEmpty(logCtx)
return !version.Less(clusterversion.ByKey(
clusterversion.V23_1EnablePebbleFormatSSTableValueBlocks)) &&
valueBlocksEnabled.Get(&cfg.Settings.SV)
}

auxDir := cfg.Opts.FS.PathJoin(cfg.Dir, base.AuxiliaryDir)
if err := cfg.Opts.FS.MkdirAll(auxDir, 0755); err != nil {
auxDir := opts.FS.PathJoin(cfg.Dir, base.AuxiliaryDir)
if err := opts.FS.MkdirAll(auxDir, 0755); err != nil {
return nil, err
}
ballastPath := base.EmergencyBallastFile(cfg.Opts.FS.PathJoin, cfg.Dir)
ballastPath := base.EmergencyBallastFile(opts.FS.PathJoin, cfg.Dir)

// For some purposes, we want to always use an unencrypted
// filesystem. The call below to ResolveEncryptedEnvOptions will
// replace cfg.Opts.FS with a VFS wrapped with encryption-at-rest if
// necessary. Before we do that, save a handle on the unencrypted
// FS for those that need it. Some call sites need the unencrypted
// FS for the purpose of atomic renames.
unencryptedFS := cfg.Opts.FS
fileRegistry, env, err := ResolveEncryptedEnvOptions(&cfg)
// filesystem.
unencryptedFS := opts.FS
fileRegistry, encryptionEnv, err := ResolveEncryptedEnvOptions(&cfg.StorageConfig, opts.FS, opts.ReadOnly)
if err != nil {
return nil, err
}
if encryptionEnv != nil {
opts.FS = encryptionEnv.FS
}

// If no logger was passed, the previous call to `EnsureDefaults` on
// `cfg.Opts` will set the logger to pebble's `DefaultLogger`. In
// `opts` will set the logger to pebble's `DefaultLogger`. In
// crdb, we want pebble-related logs to go to the storage channel,
// so we update the logger here accordingly.
if cfg.Opts.Logger == nil || cfg.Opts.Logger == pebble.DefaultLogger {
cfg.Opts.Logger = pebbleLogger{
if opts.Logger == nil || opts.Logger == pebble.DefaultLogger {
opts.Logger = pebbleLogger{
ctx: logCtx,
depth: 1,
}
Expand All @@ -930,7 +931,7 @@ func NewPebble(ctx context.Context, cfg PebbleConfig) (p *Pebble, err error) {
// Establish the emergency ballast if we can. If there's not sufficient
// disk space, the ballast will be reestablished from Capacity when the
// store's capacity is queried periodically.
if !cfg.Opts.ReadOnly {
if !opts.ReadOnly {
du, err := unencryptedFS.GetDiskUsage(cfg.Dir)
// If the FS is an in-memory FS, GetDiskUsage returns
// vfs.ErrUnsupported and we skip ballast creation.
Expand All @@ -942,16 +943,16 @@ func NewPebble(ctx context.Context, cfg PebbleConfig) (p *Pebble, err error) {
return nil, errors.Wrap(err, "resizing ballast")
}
if resized {
cfg.Opts.Logger.Infof("resized ballast %s to size %s",
opts.Logger.Infof("resized ballast %s to size %s",
ballastPath, humanizeutil.IBytes(cfg.BallastSize))
}
}
}

storeProps := computeStoreProperties(ctx, cfg.Dir, cfg.Opts.ReadOnly, env != nil /* encryptionEnabled */)
storeProps := computeStoreProperties(ctx, cfg.Dir, opts.ReadOnly, encryptionEnv != nil /* encryptionEnabled */)

p = &Pebble{
readOnly: cfg.Opts.ReadOnly,
readOnly: opts.ReadOnly,
path: cfg.Dir,
auxDir: auxDir,
ballastPath: ballastPath,
Expand All @@ -960,11 +961,11 @@ func NewPebble(ctx context.Context, cfg PebbleConfig) (p *Pebble, err error) {
attrs: cfg.Attrs,
properties: storeProps,
settings: cfg.Settings,
encryption: env,
encryption: encryptionEnv,
fileRegistry: fileRegistry,
fs: cfg.Opts.FS,
fs: opts.FS,
unencryptedFS: unencryptedFS,
logger: cfg.Opts.Logger,
logger: opts.Logger,
logCtx: logCtx,
storeIDPebbleLog: storeIDContainer,
closer: filesystemCloser,
Expand All @@ -978,8 +979,8 @@ func NewPebble(ctx context.Context, cfg PebbleConfig) (p *Pebble, err error) {
// the compactions concurrency which has already been set and allow us
// to update the compactionConcurrency on the fly by changing the
// Pebble.atomic.compactionConcurrency variable.
p.atomic.compactionConcurrency = uint64(cfg.Opts.MaxConcurrentCompactions())
cfg.Opts.MaxConcurrentCompactions = func() int {
p.atomic.compactionConcurrency = uint64(opts.MaxConcurrentCompactions())
opts.MaxConcurrentCompactions = func() int {
return int(atomic.LoadUint64(&p.atomic.compactionConcurrency))
}

Expand Down Expand Up @@ -1015,7 +1016,7 @@ func NewPebble(ctx context.Context, cfg PebbleConfig) (p *Pebble, err error) {
)

p.eventListener = &el
cfg.Opts.EventListener = &el
opts.EventListener = &el
p.wrappedIntentWriter = wrapIntentWriter(p)

// Read the current store cluster version.
Expand Down Expand Up @@ -1051,10 +1052,10 @@ func NewPebble(ctx context.Context, cfg PebbleConfig) (p *Pebble, err error) {
}

if WorkloadCollectorEnabled {
p.replayer.Attach(cfg.Opts)
p.replayer.Attach(opts)
}

db, err := pebble.Open(cfg.StorageConfig.Dir, cfg.Opts)
db, err := pebble.Open(cfg.StorageConfig.Dir, opts)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/pebble_file_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ const (

// CheckNoRegistryFile checks that no registry file currently exists.
// CheckNoRegistryFile should be called if the file registry will not be used.
func (r *PebbleFileRegistry) CheckNoRegistryFile() error {
filename, err := atomicfs.ReadMarker(r.FS, r.DBDir, registryMarkerName)
func CheckNoRegistryFile(fs vfs.FS, dbDir string) error {
filename, err := atomicfs.ReadMarker(fs, dbDir, registryMarkerName)
if oserror.IsNotExist(err) {
// ReadMarker may return oserror.IsNotExist if the data
// directory does not exist.
Expand Down
12 changes: 4 additions & 8 deletions pkg/storage/pebble_file_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,12 +190,11 @@ func TestFileRegistryCheckNoFile(t *testing.T) {
mem := vfs.NewMem()
fileEntry :=
&enginepb.FileEntry{EnvType: enginepb.EnvType_Data, EncryptionSettings: []byte("foo")}
require.NoError(t, CheckNoRegistryFile(mem, "" /* dbDir */))
registry := &PebbleFileRegistry{FS: mem}
require.NoError(t, registry.CheckNoRegistryFile())
require.NoError(t, registry.Load())
require.NoError(t, registry.SetFileEntry("/foo", fileEntry))
registry = &PebbleFileRegistry{FS: mem}
require.Error(t, registry.CheckNoRegistryFile())
require.Error(t, CheckNoRegistryFile(mem, "" /* dbDir */))
}

func TestFileRegistryElideUnencrypted(t *testing.T) {
Expand Down Expand Up @@ -293,8 +292,8 @@ func TestFileRegistryRecordsReadAndWrite(t *testing.T) {
}

// Create a file registry and add entries for a few files.
require.NoError(t, CheckNoRegistryFile(mem, "" /* dbDir */))
registry1 := &PebbleFileRegistry{FS: mem}
require.NoError(t, registry1.CheckNoRegistryFile())
require.NoError(t, registry1.Load())
for filename, entry := range files {
require.NoError(t, registry1.SetFileEntry(filename, entry))
Expand Down Expand Up @@ -332,10 +331,7 @@ func TestFileRegistry(t *testing.T) {
switch d.Cmd {
case "check-no-registry-file":
require.Nil(t, registry)
registry = &PebbleFileRegistry{FS: fs}
err := registry.CheckNoRegistryFile()
registry = nil
if err == nil {
if err := CheckNoRegistryFile(fs, "" /* dbDir */); err == nil {
fmt.Fprintf(&buf, "OK\n")
} else {
fmt.Fprintf(&buf, "Error: %s\n", err)
Expand Down

0 comments on commit f314232

Please sign in to comment.