Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
78987: cli: default to num procs concurrency in debug compact r=jbowens a=jbowens

**cli: remove OpenExistingStore**

Remove OpenExistingStore, replacing its call sites with calls to OpenEngine,
including the storage.MustExist config option.

**cli: default to num procs concurrency in debug compact**

Now that some concurrency within manual offline compactions is possible,
default to a max compaction concurrency equal to the number of
processors during offline manual compactions. Add a new
`--max-concurrency` flag to the debug compact command to override this
default.

The undocumented `COCKROACH_ROCKSDB_CONCURRENCY` environment variable
will no longer affect the maximum concurrency for this offline
compaction. That environment variable is intended for controlling
compaction concurrency for a running CockroachDB node. The switch to an
explicit flag for this command makes the concurrency control more
discoverable (it appears within the command help output) and allows for
the more reasonable default concurrency in this context.

Release note (cli change): Changes the default `debug compact`
maximum compaction concurrency to the number of processors, and adds a
`--max-concurrency` flag for overriding the new default.

Co-authored-by: Jackson Owens <[email protected]>
  • Loading branch information
craig[bot] and jbowens committed May 6, 2022
2 parents 98bdf32 + d4242a7 commit 40d8803
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 86 deletions.
3 changes: 2 additions & 1 deletion pkg/ccl/cliccl/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/storageccl/engineccl/enginepbccl"
"github.com/cockroachdb/cockroach/pkg/cli"
"github.com/cockroachdb/cockroach/pkg/cli/clierrorplus"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/stop"
Expand Down Expand Up @@ -160,7 +161,7 @@ func runEncryptionStatus(cmd *cobra.Command, args []string) error {

dir := args[0]

db, err := cli.OpenExistingStore(dir, stopper, true /* readOnly */, false /* disableAutomaticCompactions */)
db, err := cli.OpenEngine(dir, stopper, storage.MustExist, storage.ReadOnly)
if err != nil {
return err
}
Expand Down
67 changes: 24 additions & 43 deletions pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"os"
"path/filepath"
"regexp"
"runtime"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -163,42 +164,11 @@ func (f *keyFormat) Type() string {
return "hex|base64"
}

// OpenEngineOptions tunes the behavior of OpenEngine.
type OpenEngineOptions struct {
ReadOnly bool
MustExist bool
DisableAutomaticCompactions bool
}

func (opts OpenEngineOptions) configOptions() []storage.ConfigOption {
var cfgOpts []storage.ConfigOption
if opts.ReadOnly {
cfgOpts = append(cfgOpts, storage.ReadOnly)
}
if opts.MustExist {
cfgOpts = append(cfgOpts, storage.MustExist)
}
if opts.DisableAutomaticCompactions {
cfgOpts = append(cfgOpts, storage.DisableAutomaticCompactions)
}
return cfgOpts
}

// OpenExistingStore opens the Pebble engine rooted at 'dir'. If 'readOnly' is
// true, opens the store in read-only mode. If 'disableAutomaticCompactions' is
// true, disables automatic/background compactions (only used for manual
// compactions).
func OpenExistingStore(
dir string, stopper *stop.Stopper, readOnly, disableAutomaticCompactions bool,
) (storage.Engine, error) {
return OpenEngine(dir, stopper, OpenEngineOptions{
ReadOnly: readOnly, MustExist: true, DisableAutomaticCompactions: disableAutomaticCompactions,
})
}

// OpenEngine opens the engine at 'dir'. Depending on the supplied options,
// an empty engine might be initialized.
func OpenEngine(dir string, stopper *stop.Stopper, opts OpenEngineOptions) (storage.Engine, error) {
func OpenEngine(
dir string, stopper *stop.Stopper, opts ...storage.ConfigOption,
) (storage.Engine, error) {
maxOpenFiles, err := server.SetOpenFileLimitForOneStore()
if err != nil {
return nil, err
Expand All @@ -209,7 +179,7 @@ func OpenEngine(dir string, stopper *stop.Stopper, opts OpenEngineOptions) (stor
storage.CacheSize(server.DefaultCacheSize),
storage.Settings(serverCfg.Settings),
storage.Hook(PopulateStorageConfigHook),
storage.CombineOptions(opts.configOptions()...))
storage.CombineOptions(opts...))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -281,7 +251,7 @@ func runDebugKeys(cmd *cobra.Command, args []string) error {
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())

db, err := OpenExistingStore(args[0], stopper, true /* readOnly */, false /* disableAutomaticCompactions */)
db, err := OpenEngine(args[0], stopper, storage.MustExist, storage.ReadOnly)
if err != nil {
return err
}
Expand Down Expand Up @@ -454,7 +424,7 @@ func runDebugRangeData(cmd *cobra.Command, args []string) error {
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())

db, err := OpenExistingStore(args[0], stopper, true /* readOnly */, false /* disableAutomaticCompactions */)
db, err := OpenEngine(args[0], stopper, storage.ReadOnly, storage.MustExist)
if err != nil {
return err
}
Expand Down Expand Up @@ -544,7 +514,7 @@ func runDebugRangeDescriptors(cmd *cobra.Command, args []string) error {
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())

db, err := OpenExistingStore(args[0], stopper, true /* readOnly */, false /* disableAutomaticCompactions */)
db, err := OpenEngine(args[0], stopper, storage.ReadOnly, storage.MustExist)
if err != nil {
return err
}
Expand Down Expand Up @@ -687,7 +657,7 @@ func runDebugRaftLog(cmd *cobra.Command, args []string) error {
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())

db, err := OpenExistingStore(args[0], stopper, true /* readOnly */, false /* disableAutomaticCompactions */)
db, err := OpenEngine(args[0], stopper, storage.ReadOnly, storage.MustExist)
if err != nil {
return err
}
Expand Down Expand Up @@ -757,7 +727,7 @@ func runDebugGCCmd(cmd *cobra.Command, args []string) error {
}
}

db, err := OpenExistingStore(args[0], stopper, true /* readOnly */, false /* disableAutomaticCompactions */)
db, err := OpenEngine(args[0], stopper, storage.ReadOnly, storage.MustExist)
if err != nil {
return err
}
Expand Down Expand Up @@ -841,6 +811,10 @@ Output environment variables that influence configuration.
},
}

var debugCompactOpts = struct {
maxConcurrency int
}{maxConcurrency: runtime.GOMAXPROCS(0)}

var debugCompactCmd = &cobra.Command{
Use: "compact <directory>",
Short: "compact the sstables in a store",
Expand All @@ -855,7 +829,10 @@ func runDebugCompact(cmd *cobra.Command, args []string) error {
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())

db, err := OpenExistingStore(args[0], stopper, false /* readOnly */, true /* disableAutomaticCompactions */)
db, err := OpenEngine(args[0], stopper,
storage.MustExist,
storage.DisableAutomaticCompactions,
storage.MaxConcurrentCompactions(debugCompactOpts.maxConcurrency))
if err != nil {
return err
}
Expand Down Expand Up @@ -1123,7 +1100,7 @@ func runDebugUnsafeRemoveDeadReplicas(cmd *cobra.Command, args []string) error {
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())

db, err := OpenExistingStore(args[0], stopper, false /* readOnly */, false /* disableAutomaticCompactions */)
db, err := OpenEngine(args[0], stopper, storage.MustExist)
if err != nil {
return err
}
Expand Down Expand Up @@ -1494,7 +1471,7 @@ func runDebugIntentCount(cmd *cobra.Command, args []string) error {
ctx := context.Background()
defer stopper.Stop(ctx)

db, err := OpenExistingStore(args[0], stopper, true /* readOnly */, false /* disableAutomaticCompactions */)
db, err := OpenEngine(args[0], stopper, storage.MustExist, storage.ReadOnly)
if err != nil {
return err
}
Expand Down Expand Up @@ -1708,6 +1685,10 @@ func init() {
f.BoolVarP(&syncBenchOpts.LogOnly, "log-only", "l", syncBenchOpts.LogOnly,
"only write to the WAL, not to sstables")

f = debugCompactCmd.Flags()
f.IntVarP(&debugCompactOpts.maxConcurrency, "max-concurrency", "c", debugCompactOpts.maxConcurrency,
"maximum number of concurrent compactions")

f = debugUnsafeRemoveDeadReplicasCmd.Flags()
f.IntSliceVar(&removeDeadReplicasOpts.deadStoreIDs, "dead-store-ids", nil,
"list of dead store IDs")
Expand Down
4 changes: 2 additions & 2 deletions pkg/cli/debug_check_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func checkStoreRangeStats(
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

eng, err := OpenExistingStore(dir, stopper, true /* readOnly */, false /* disableAutomaticCompactions */)
eng, err := OpenEngine(dir, stopper, storage.MustExist, storage.ReadOnly)
if err != nil {
return err
}
Expand Down Expand Up @@ -220,7 +220,7 @@ func checkStoreRaftState(
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())

db, err := OpenExistingStore(dir, stopper, true /* readOnly */, false /* disableAutomaticCompactions */)
db, err := OpenEngine(dir, stopper, storage.MustExist, storage.ReadOnly)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/cli/debug_recover_loss_of_quorum.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func runDebugDeadReplicaCollect(cmd *cobra.Command, args []string) error {

var stores []storage.Engine
for _, storeSpec := range debugRecoverCollectInfoOpts.Stores.Specs {
db, err := OpenExistingStore(storeSpec.Path, stopper, true /* readOnly */, false /* disableAutomaticCompactions */)
db, err := OpenEngine(storeSpec.Path, stopper, storage.MustExist, storage.ReadOnly)
if err != nil {
return errors.Wrapf(err, "failed to open store at path %q, ensure that store path is "+
"correct and that it is not used by another process", storeSpec.Path)
Expand Down Expand Up @@ -457,7 +457,7 @@ func runDebugExecuteRecoverPlan(cmd *cobra.Command, args []string) error {
var localNodeID roachpb.NodeID
batches := make(map[roachpb.StoreID]storage.Batch)
for _, storeSpec := range debugRecoverExecuteOpts.Stores.Specs {
store, err := OpenExistingStore(storeSpec.Path, stopper, false /* readOnly */, false /* disableAutomaticCompactions */)
store, err := OpenEngine(storeSpec.Path, stopper, storage.MustExist)
if err != nil {
return errors.Wrapf(err, "failed to open store at path %q. ensure that store path is "+
"correct and that it is not used by another process", storeSpec.Path)
Expand Down
2 changes: 1 addition & 1 deletion pkg/cli/debug_synctest.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func runSyncer(
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

db, err := OpenEngine(dir, stopper, OpenEngineOptions{})
db, err := OpenEngine(dir, stopper)
if err != nil {
if expSeq == 0 {
// Failed on first open, before we tried to corrupt anything. Hard stop.
Expand Down
43 changes: 6 additions & 37 deletions pkg/cli/debug_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,41 +53,6 @@ func createStore(t *testing.T, path string) {
db.Close()
}

func TestOpenExistingStore(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())

baseDir, dirCleanupFn := testutils.TempDir(t)
defer dirCleanupFn()

dirExists := filepath.Join(baseDir, "exists")
dirMissing := filepath.Join(baseDir, "missing")
createStore(t, dirExists)

for _, test := range []struct {
dir string
expErr string
}{
{
dir: dirExists,
expErr: "",
},
{
dir: dirMissing,
expErr: `does not exist|no such file or directory`,
},
} {
t.Run(fmt.Sprintf("dir=%s", test.dir), func(t *testing.T) {
_, err := OpenExistingStore(test.dir, stopper, false /* readOnly */, false /* disableAutomaticCompactions */)
if !testutils.IsError(err, test.expErr) {
t.Errorf("wanted %s but got %v", test.expErr, err)
}
})
}
}

func TestOpenReadOnlyStore(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand All @@ -114,7 +79,11 @@ func TestOpenReadOnlyStore(t *testing.T) {
},
} {
t.Run(fmt.Sprintf("readOnly=%t", test.readOnly), func(t *testing.T) {
db, err := OpenExistingStore(storePath, stopper, test.readOnly, false /* disableAutomaticCompactions */)
var opts []storage.ConfigOption
if test.readOnly {
opts = append(opts, storage.ReadOnly)
}
db, err := OpenEngine(storePath, stopper, opts...)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -255,7 +224,7 @@ func TestRemoveDeadReplicas(t *testing.T) {
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

db, err := OpenExistingStore(storePaths[idx], stopper, false /* readOnly */, false /* disableAutomaticCompactions */)
db, err := OpenEngine(storePaths[idx], stopper, storage.MustExist)
if err != nil {
return err
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/storage/open.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,15 @@ func CacheSize(size int64) ConfigOption {
}
}

// MaxConcurrentCompactions configures the maximum number of concurrent
// compactions an Engine will execute.
func MaxConcurrentCompactions(n int) ConfigOption {
return func(cfg *engineConfig) error {
cfg.Opts.MaxConcurrentCompactions = n
return nil
}
}

// EncryptionAtRest configures an engine to use encryption-at-rest. It is used
// for configuring in-memory engines, which are used in tests. It is not safe
// to modify the given slice afterwards as it is captured by reference.
Expand Down

0 comments on commit 40d8803

Please sign in to comment.