diff --git a/DEPS.bzl b/DEPS.bzl index e7d0ee83a07e..e8db32e4d293 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -1431,10 +1431,10 @@ def go_deps(): patches = [ "@com_github_cockroachdb_cockroach//build/patches:com_github_cockroachdb_pebble.patch", ], - sha256 = "08c72d014d11e94dba62660d804f68676b5dbf2172305ae013a4864c5fd30d2e", - strip_prefix = "github.com/cockroachdb/pebble@v0.0.0-20220813213520-4c5e752da1b8", + sha256 = "855751657ff6d4aa229474e378abb4417b7ece06ca08f1f12ed842553f372497", + strip_prefix = "github.com/cockroachdb/pebble@v0.0.0-20220815175543-94bbf14378db", urls = [ - "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/pebble/com_github_cockroachdb_pebble-v0.0.0-20220813213520-4c5e752da1b8.zip", + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/pebble/com_github_cockroachdb_pebble-v0.0.0-20220815175543-94bbf14378db.zip", ], ) go_repository( diff --git a/build/bazelutil/distdir_files.bzl b/build/bazelutil/distdir_files.bzl index 59174518ed57..f7e6f4acdbbf 100644 --- a/build/bazelutil/distdir_files.bzl +++ b/build/bazelutil/distdir_files.bzl @@ -187,7 +187,7 @@ DISTDIR_FILES = { "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/go-test-teamcity/com_github_cockroachdb_go_test_teamcity-v0.0.0-20191211140407-cff980ad0a55.zip": "bac30148e525b79d004da84d16453ddd2d5cd20528e9187f1d7dac708335674b", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/gostdlib/com_github_cockroachdb_gostdlib-v1.13.0.zip": "b3d43d8f95edf65f73a5348f29e1159823cac64b148f8d3bb48340bf55d70872", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/logtags/com_github_cockroachdb_logtags-v0.0.0-20211118104740-dabe8e521a4f.zip": "1972c3f171f118add3fd9e64bcea6cbb9959a3b7fa0ada308e8a7310813fea74", - "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/pebble/com_github_cockroachdb_pebble-v0.0.0-20220813213520-4c5e752da1b8.zip": "08c72d014d11e94dba62660d804f68676b5dbf2172305ae013a4864c5fd30d2e", + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/pebble/com_github_cockroachdb_pebble-v0.0.0-20220815175543-94bbf14378db.zip": "855751657ff6d4aa229474e378abb4417b7ece06ca08f1f12ed842553f372497", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/redact/com_github_cockroachdb_redact-v1.1.3.zip": "7778b1e4485e4f17f35e5e592d87eb99c29e173ac9507801d000ad76dd0c261e", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/returncheck/com_github_cockroachdb_returncheck-v0.0.0-20200612231554-92cdbca611dd.zip": "ce92ba4352deec995b1f2eecf16eba7f5d51f5aa245a1c362dfe24c83d31f82b", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/sentry-go/com_github_cockroachdb_sentry_go-v0.6.1-cockroachdb.2.zip": "fbb2207d02aecfdd411b1357efe1192dbb827959e36b7cab7491731ac55935c9", diff --git a/go.mod b/go.mod index f959bd0a0ab5..689503e824d3 100644 --- a/go.mod +++ b/go.mod @@ -47,7 +47,7 @@ require ( github.com/cockroachdb/go-test-teamcity v0.0.0-20191211140407-cff980ad0a55 github.com/cockroachdb/gostdlib v1.13.0 github.com/cockroachdb/logtags v0.0.0-20211118104740-dabe8e521a4f - github.com/cockroachdb/pebble v0.0.0-20220813213520-4c5e752da1b8 + github.com/cockroachdb/pebble v0.0.0-20220815175543-94bbf14378db github.com/cockroachdb/redact v1.1.3 github.com/cockroachdb/returncheck v0.0.0-20200612231554-92cdbca611dd github.com/cockroachdb/stress v0.0.0-20220803192808-1806698b1b7b diff --git a/go.sum b/go.sum index 6a24e38b3c43..080f5e80f332 100644 --- a/go.sum +++ b/go.sum @@ -467,8 +467,8 @@ github.com/cockroachdb/gostdlib v1.13.0/go.mod h1:eXX95p9QDrYwJfJ6AgeN9QnRa/lqqi github.com/cockroachdb/logtags v0.0.0-20190617123548-eb05cc24525f/go.mod h1:i/u985jwjWRlyHXQbwatDASoW0RMlZ/3i9yJHE2xLkI= github.com/cockroachdb/logtags v0.0.0-20211118104740-dabe8e521a4f h1:6jduT9Hfc0njg5jJ1DdKCFPdMBrp/mdZfCpa5h+WM74= github.com/cockroachdb/logtags v0.0.0-20211118104740-dabe8e521a4f/go.mod h1:Vz9DsVWQQhf3vs21MhPMZpMGSht7O/2vFW2xusFUVOs= -github.com/cockroachdb/pebble v0.0.0-20220813213520-4c5e752da1b8 h1:on483Ok6H4Bh4BHrSc+LKlmqouIj8L/f9+U8t1bVKcs= -github.com/cockroachdb/pebble v0.0.0-20220813213520-4c5e752da1b8/go.mod h1:890yq1fUb9b6dGNwssgeUO5vQV9qfXnCPxAJhBQfXw0= +github.com/cockroachdb/pebble v0.0.0-20220815175543-94bbf14378db h1:1Yk1skUQZ4Owkq6kHXJHvmgi0ao58jLS3615GONojwk= +github.com/cockroachdb/pebble v0.0.0-20220815175543-94bbf14378db/go.mod h1:890yq1fUb9b6dGNwssgeUO5vQV9qfXnCPxAJhBQfXw0= github.com/cockroachdb/redact v1.0.8/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg= github.com/cockroachdb/redact v1.1.3 h1:AKZds10rFSIj7qADf0g46UixK8NNLwWTNdCIGS5wfSQ= github.com/cockroachdb/redact v1.1.3/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg= diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index e39af08771b1..0ecbdb630aa5 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -8,7 +8,6 @@ go_library( name = "kvserver", srcs = [ "addressing.go", - "compact_span_client.go", "consistency_queue.go", "debug_print.go", "doc.go", @@ -79,6 +78,7 @@ go_library( "split_delay_helper.go", "split_queue.go", "split_trigger_helper.go", + "storage_engine_client.go", "store.go", "store_create_replica.go", "store_init.go", diff --git a/pkg/kv/kvserver/api.proto b/pkg/kv/kvserver/api.proto index c2ba71565d7e..9c50cd5ce62c 100644 --- a/pkg/kv/kvserver/api.proto +++ b/pkg/kv/kvserver/api.proto @@ -85,3 +85,13 @@ message CompactEngineSpanRequest { message CompactEngineSpanResponse { } + +// CompactionConcurrencyRequest increases the compaction concurrency of the store +// until the request is cancelled. +message CompactionConcurrencyRequest { + StoreRequestHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true]; + uint64 compaction_concurrency = 2; +} + +message CompactionConcurrencyResponse { +} diff --git a/pkg/kv/kvserver/compact_span_client.go b/pkg/kv/kvserver/storage_engine_client.go similarity index 54% rename from pkg/kv/kvserver/compact_span_client.go rename to pkg/kv/kvserver/storage_engine_client.go index bf0ab1947bf0..c974498a3e5a 100644 --- a/pkg/kv/kvserver/compact_span_client.go +++ b/pkg/kv/kvserver/storage_engine_client.go @@ -19,19 +19,18 @@ import ( "github.com/cockroachdb/errors" ) -// CompactEngineSpanClient is used to request compaction for a span -// of data on a store. -type CompactEngineSpanClient struct { +// StorageEngineClient is used to connect and make requests to a store. +type StorageEngineClient struct { nd *nodedialer.Dialer } -// NewCompactEngineSpanClient constructs a new CompactEngineSpanClient. -func NewCompactEngineSpanClient(nd *nodedialer.Dialer) *CompactEngineSpanClient { - return &CompactEngineSpanClient{nd: nd} +// NewStorageEngineClient constructs a new StorageEngineClient. +func NewStorageEngineClient(nd *nodedialer.Dialer) *StorageEngineClient { + return &StorageEngineClient{nd: nd} } // CompactEngineSpan is a tree.CompactEngineSpanFunc. -func (c *CompactEngineSpanClient) CompactEngineSpan( +func (c *StorageEngineClient) CompactEngineSpan( ctx context.Context, nodeID, storeID int32, startKey, endKey []byte, ) error { conn, err := c.nd.Dial(ctx, roachpb.NodeID(nodeID), rpc.DefaultClass) @@ -49,3 +48,26 @@ func (c *CompactEngineSpanClient) CompactEngineSpan( _, err = client.CompactEngineSpan(ctx, req) return err } + +// SetCompactionConcurrency is a tree.CompactionConcurrencyFunc. +func (c *StorageEngineClient) SetCompactionConcurrency( + ctx context.Context, nodeID, storeID int32, compactionConcurrency uint64, +) error { + conn, err := c.nd.Dial(ctx, roachpb.NodeID(nodeID), rpc.DefaultClass) + if err != nil { + return errors.Wrapf(err, "could not dial node ID %d", nodeID) + } + client := NewPerStoreClient(conn) + req := &CompactionConcurrencyRequest{ + StoreRequestHeader: StoreRequestHeader{ + NodeID: roachpb.NodeID(nodeID), + StoreID: roachpb.StoreID(storeID), + }, + CompactionConcurrency: compactionConcurrency, + } + _, err = client.SetCompactionConcurrency(ctx, req) + if err != nil { + return err + } + return nil +} diff --git a/pkg/kv/kvserver/storage_services.proto b/pkg/kv/kvserver/storage_services.proto index 4f24f423fdc7..0cdfbdd0002c 100644 --- a/pkg/kv/kvserver/storage_services.proto +++ b/pkg/kv/kvserver/storage_services.proto @@ -39,4 +39,5 @@ service PerReplica { service PerStore { rpc CompactEngineSpan(cockroach.kv.kvserver.CompactEngineSpanRequest) returns (cockroach.kv.kvserver.CompactEngineSpanResponse) {} + rpc SetCompactionConcurrency(cockroach.kv.kvserver.CompactionConcurrencyRequest) returns (cockroach.kv.kvserver.CompactionConcurrencyResponse) {} } diff --git a/pkg/kv/kvserver/stores_server.go b/pkg/kv/kvserver/stores_server.go index e96776959c2a..73fd82dfe8c8 100644 --- a/pkg/kv/kvserver/stores_server.go +++ b/pkg/kv/kvserver/stores_server.go @@ -166,3 +166,24 @@ func (is Server) CompactEngineSpan( }) return resp, err } + +// SetCompactionConcurrency implements PerStoreServer. It changes the compaction +// concurrency of a store. While SetCompactionConcurrency is safe for concurrent +// use, it adds uncertainty about the compaction concurrency actually set on +// the store. It also adds uncertainty about the compaction concurrency set on +// the store once the request is cancelled. +func (is Server) SetCompactionConcurrency( + ctx context.Context, req *CompactionConcurrencyRequest, +) (*CompactionConcurrencyResponse, error) { + resp := &CompactionConcurrencyResponse{} + err := is.execStoreCommand(ctx, req.StoreRequestHeader, + func(ctx context.Context, s *Store) error { + prevConcurrency := s.Engine().SetCompactionConcurrency(req.CompactionConcurrency) + + // Wait for cancellation, and once cancelled, reset the compaction concurrency. + <-ctx.Done() + s.Engine().SetCompactionConcurrency(prevConcurrency) + return nil + }) + return resp, err +} diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index ab85cc4c817a..a098516f1ae5 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -233,7 +233,6 @@ go_library( "//pkg/util/buildutil", "//pkg/util/contextutil", "//pkg/util/envutil", - "//pkg/util/errorutil", "//pkg/util/goschedstats", "//pkg/util/grpcutil", "//pkg/util/hlc", diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index ef763d46954d..23c06671689c 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -105,7 +105,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/envutil" - "github.com/cockroachdb/cockroach/pkg/util/errorutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" @@ -578,18 +577,6 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { gcJobNotifier := gcjobnotifier.New(cfg.Settings, cfg.systemConfigWatcher, codec, cfg.stopper) - var compactEngineSpanFunc eval.CompactEngineSpanFunc - if !codec.ForSystemTenant() { - compactEngineSpanFunc = func( - ctx context.Context, nodeID, storeID int32, startKey, endKey []byte, - ) error { - return errorutil.UnsupportedWithMultiTenancy(errorutil.FeatureNotAvailableToNonSystemTenantsIssue) - } - } else { - cli := kvserver.NewCompactEngineSpanClient(cfg.nodeDialer) - compactEngineSpanFunc = cli.CompactEngineSpan - } - spanConfig := struct { manager *spanconfigmanager.Manager sqlTranslatorFactory *spanconfigsqltranslator.Factory @@ -761,43 +748,46 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { ) contentionRegistry.Start(ctx, cfg.stopper) + storageEngineClient := kvserver.NewStorageEngineClient(cfg.nodeDialer) + *execCfg = sql.ExecutorConfig{ - Settings: cfg.Settings, - NodeInfo: nodeInfo, - Codec: codec, - DefaultZoneConfig: &cfg.DefaultZoneConfig, - Locality: cfg.Locality, - AmbientCtx: cfg.AmbientCtx, - DB: cfg.db, - Gossip: cfg.gossip, - NodeLiveness: cfg.nodeLiveness, - SystemConfig: cfg.systemConfigWatcher, - MetricsRecorder: cfg.recorder, - DistSender: cfg.distSender, - RPCContext: cfg.rpcContext, - LeaseManager: leaseMgr, - TenantStatusServer: cfg.tenantStatusServer, - Clock: cfg.clock, - DistSQLSrv: distSQLServer, - NodesStatusServer: cfg.nodesStatusServer, - SQLStatusServer: cfg.sqlStatusServer, - RegionsServer: cfg.regionsServer, - SessionRegistry: cfg.sessionRegistry, - ClosedSessionCache: cfg.closedSessionCache, - ContentionRegistry: contentionRegistry, - SQLLiveness: cfg.sqlLivenessProvider, - JobRegistry: jobRegistry, - VirtualSchemas: virtualSchemas, - HistogramWindowInterval: cfg.HistogramWindowInterval(), - RangeDescriptorCache: cfg.distSender.RangeDescriptorCache(), - RoleMemberCache: sql.NewMembershipCache(serverCacheMemoryMonitor.MakeBoundAccount(), cfg.stopper), - SessionInitCache: sessioninit.NewCache(serverCacheMemoryMonitor.MakeBoundAccount(), cfg.stopper), - RootMemoryMonitor: rootSQLMemoryMonitor, - TestingKnobs: sqlExecutorTestingKnobs, - CompactEngineSpanFunc: compactEngineSpanFunc, - TraceCollector: traceCollector, - TenantUsageServer: cfg.tenantUsageServer, - KVStoresIterator: cfg.kvStoresIterator, + Settings: cfg.Settings, + NodeInfo: nodeInfo, + Codec: codec, + DefaultZoneConfig: &cfg.DefaultZoneConfig, + Locality: cfg.Locality, + AmbientCtx: cfg.AmbientCtx, + DB: cfg.db, + Gossip: cfg.gossip, + NodeLiveness: cfg.nodeLiveness, + SystemConfig: cfg.systemConfigWatcher, + MetricsRecorder: cfg.recorder, + DistSender: cfg.distSender, + RPCContext: cfg.rpcContext, + LeaseManager: leaseMgr, + TenantStatusServer: cfg.tenantStatusServer, + Clock: cfg.clock, + DistSQLSrv: distSQLServer, + NodesStatusServer: cfg.nodesStatusServer, + SQLStatusServer: cfg.sqlStatusServer, + RegionsServer: cfg.regionsServer, + SessionRegistry: cfg.sessionRegistry, + ClosedSessionCache: cfg.closedSessionCache, + ContentionRegistry: contentionRegistry, + SQLLiveness: cfg.sqlLivenessProvider, + JobRegistry: jobRegistry, + VirtualSchemas: virtualSchemas, + HistogramWindowInterval: cfg.HistogramWindowInterval(), + RangeDescriptorCache: cfg.distSender.RangeDescriptorCache(), + RoleMemberCache: sql.NewMembershipCache(serverCacheMemoryMonitor.MakeBoundAccount(), cfg.stopper), + SessionInitCache: sessioninit.NewCache(serverCacheMemoryMonitor.MakeBoundAccount(), cfg.stopper), + RootMemoryMonitor: rootSQLMemoryMonitor, + TestingKnobs: sqlExecutorTestingKnobs, + CompactEngineSpanFunc: storageEngineClient.CompactEngineSpan, + CompactionConcurrencyFunc: storageEngineClient.SetCompactionConcurrency, + TraceCollector: traceCollector, + TenantUsageServer: cfg.tenantUsageServer, + KVStoresIterator: cfg.kvStoresIterator, SyntheticPrivilegeCache: cacheutil.NewCache( serverCacheMemoryMonitor.MakeBoundAccount(), cfg.stopper, 1 /* numSystemTables */), diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 1a31c6309830..f663ebe74746 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -1287,6 +1287,10 @@ type ExecutorConfig struct { // perform compaction over a key span. CompactEngineSpanFunc eval.CompactEngineSpanFunc + // CompactionConcurrencyFunc is used to inform a storage engine to change its + // compaction concurrency. + CompactionConcurrencyFunc eval.SetCompactionConcurrencyFunc + // TraceCollector is used to contact all live nodes in the cluster, and // collect trace spans from their inflight node registries. TraceCollector *collector.TraceCollector diff --git a/pkg/sql/logictest/testdata/logic_test/pg_catalog b/pkg/sql/logictest/testdata/logic_test/pg_catalog index 5aa73fd783fb..53829f9ac1aa 100644 --- a/pkg/sql/logictest/testdata/logic_test/pg_catalog +++ b/pkg/sql/logictest/testdata/logic_test/pg_catalog @@ -4650,7 +4650,7 @@ FROM pg_proc p JOIN pg_type t ON t.typinput = p.oid WHERE t.typname = '_int4' ---- -2012 array_in array_in +2013 array_in array_in ## #16285 ## int2vectors should be 0-indexed @@ -4688,7 +4688,7 @@ SELECT cur_max_builtin_oid FROM [SELECT max(oid) as cur_max_builtin_oid FROM pg_ query TT SELECT proname, oid FROM pg_catalog.pg_proc WHERE oid = $cur_max_builtin_oid ---- -to_regtype 2032 +to_regtype 2033 ## Ensure that unnest works with oid wrapper arrays diff --git a/pkg/sql/planner.go b/pkg/sql/planner.go index 3f90790473d5..302272fd8617 100644 --- a/pkg/sql/planner.go +++ b/pkg/sql/planner.go @@ -119,6 +119,7 @@ func (evalCtx *extendedEvalContext) copyFromExecCfg(execCfg *ExecutorConfig) { evalCtx.Tracer = execCfg.AmbientCtx.Tracer evalCtx.SQLLivenessReader = execCfg.SQLLiveness evalCtx.CompactEngineSpan = execCfg.CompactEngineSpanFunc + evalCtx.SetCompactionConcurrency = execCfg.CompactionConcurrencyFunc evalCtx.TestingKnobs = execCfg.EvalContextTestingKnobs evalCtx.ClusterID = execCfg.NodeInfo.LogicalClusterID() evalCtx.ClusterName = execCfg.RPCContext.ClusterName() diff --git a/pkg/sql/sem/builtins/builtins.go b/pkg/sql/sem/builtins/builtins.go index b7eca29fcf7c..f270f11b7232 100644 --- a/pkg/sql/sem/builtins/builtins.go +++ b/pkg/sql/sem/builtins/builtins.go @@ -7205,6 +7205,50 @@ that has execution latency greater than the 'minExecutionLatency'. If the expires until the statement bundle is collected`, }, ), + + "crdb_internal.set_compaction_concurrency": makeBuiltin( + tree.FunctionProperties{ + Category: builtinconstants.CategorySystemRepair, + DistsqlBlocklist: true, + Undocumented: true, + }, + tree.Overload{ + Types: tree.ArgTypes{ + {"node_id", types.Int}, + {"store_id", types.Int}, + {"compaction_concurrency", types.Int}, + }, + ReturnType: tree.FixedReturnType(types.Bool), + Fn: func(ctx *eval.Context, args tree.Datums) (tree.Datum, error) { + isAdmin, err := ctx.SessionAccessor.HasAdminRole(ctx.Context) + if err != nil { + return nil, err + } + if !isAdmin { + return nil, errInsufficientPriv + } + nodeID := int32(tree.MustBeDInt(args[0])) + storeID := int32(tree.MustBeDInt(args[1])) + compactionConcurrency := tree.MustBeDInt(args[2]) + if compactionConcurrency <= 0 { + return nil, errors.AssertionFailedf("compaction_concurrency must be > 0") + } + if err = ctx.SetCompactionConcurrency( + ctx.Context, nodeID, storeID, uint64(compactionConcurrency)); err != nil { + return nil, err + } + return tree.DBoolTrue, nil + }, + Info: "This function can be used to temporarily change the compaction concurrency of a " + + "given node and store. " + + "To change the compaction concurrency of a store one can do: " + + "SELECT crdb_internal.set_compaction_concurrency(, , ). " + + "The store's compaction concurrency will change until the sql command is cancelled. Once cancelled " + + "the store's compaction concurrency will return to what it was previously. This command isn't safe " + + "for concurrent use.", + Volatility: volatility.Volatile, + }, + ), } var lengthImpls = func(incBitOverload bool) builtinDefinition { diff --git a/pkg/sql/sem/eval/context.go b/pkg/sql/sem/eval/context.go index 9024a21cf6b1..666328012ec0 100644 --- a/pkg/sql/sem/eval/context.go +++ b/pkg/sql/sem/eval/context.go @@ -198,6 +198,10 @@ type Context struct { // CompactEngineSpan is used to force compaction of a span in a store. CompactEngineSpan CompactEngineSpanFunc + // SetCompactionConcurrency is used to change the compaction concurrency of + // a store. + SetCompactionConcurrency SetCompactionConcurrencyFunc + // KVStoresIterator is used by various crdb_internal builtins to directly // access stores on this node. KVStoresIterator kvserverbase.StoresIterator diff --git a/pkg/sql/sem/eval/deps.go b/pkg/sql/sem/eval/deps.go index fa8f787c49b4..e2e15a3a22ff 100644 --- a/pkg/sql/sem/eval/deps.go +++ b/pkg/sql/sem/eval/deps.go @@ -384,6 +384,12 @@ type CompactEngineSpanFunc func( ctx context.Context, nodeID, storeID int32, startKey, endKey []byte, ) error +// SetCompactionConcurrencyFunc is used to change the compaction concurrency of a +// store. +type SetCompactionConcurrencyFunc func( + ctx context.Context, nodeID, storeID int32, compactionConcurrency uint64, +) error + // SessionAccessor is a limited interface to access session variables. type SessionAccessor interface { // SetSessionVar sets a session variable to a new value. If isLocal is true, diff --git a/pkg/storage/engine.go b/pkg/storage/engine.go index 647dda6d2631..700fa1fa360c 100644 --- a/pkg/storage/engine.go +++ b/pkg/storage/engine.go @@ -945,6 +945,10 @@ type Engine interface { // calls to this method. Hence, this should be used with care, with only one // caller, which is currently the admission control subsystem. GetInternalIntervalMetrics() *pebble.InternalIntervalMetrics + + // SetCompactionConcurrency is used to set the engine's compaction + // concurrency. It returns the previous compaction concurrency. + SetCompactionConcurrency(n uint64) uint64 } // Batch is the interface for batch specific operations. diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go index cc3011ae933a..e4abac5b1752 100644 --- a/pkg/storage/pebble.go +++ b/pkg/storage/pebble.go @@ -646,6 +646,17 @@ type EncryptionStatsHandler interface { // Pebble is a wrapper around a Pebble database instance. type Pebble struct { + atomic struct { + // compactionConcurrency is the current compaction concurrency set on + // the Pebble store. The compactionConcurrency option in the Pebble + // Options struct is a closure which will return + // Pebble.atomic.compactionConcurrency. + // + // This mechanism allows us to change the Pebble compactionConcurrency + // on the fly without restarting Pebble. + compactionConcurrency uint64 + } + db *pebble.DB closed bool @@ -727,6 +738,12 @@ type StoreIDSetter interface { SetStoreID(ctx context.Context, storeID int32) } +// SetCompactionConcurrency will return the previous compaction concurrency. +func (p *Pebble) SetCompactionConcurrency(n uint64) uint64 { + prevConcurrency := atomic.SwapUint64(&p.atomic.compactionConcurrency, n) + return prevConcurrency +} + // SetStoreID adds the store id to pebble logs. func (p *Pebble) SetStoreID(ctx context.Context, storeID int32) { if p == nil { @@ -886,6 +903,19 @@ func NewPebble(ctx context.Context, cfg PebbleConfig) (p *Pebble, err error) { storeIDPebbleLog: storeIDContainer, closer: filesystemCloser, } + + // MaxConcurrentCompactions can be set by multiple sources, but all the + // sources will eventually call NewPebble. So, we override + // Opts.MaxConcurrentCompactions to a closure which will return + // Pebble.atomic.compactionConcurrency. This will allow us to both honor + // the compactions concurrency which has already been set and allow us + // to update the compactionConcurrency on the fly by changing the + // Pebble.atomic.compactionConcurrency variable. + p.atomic.compactionConcurrency = uint64(cfg.Opts.MaxConcurrentCompactions()) + cfg.Opts.MaxConcurrentCompactions = func() int { + return int(atomic.LoadUint64(&p.atomic.compactionConcurrency)) + } + cfg.Opts.EventListener = pebble.TeeEventListener( pebble.MakeLoggingEventListener(pebbleLogger{ ctx: logCtx, diff --git a/pkg/ui/workspaces/db-console/src/redux/localsettings.ts b/pkg/ui/workspaces/db-console/src/redux/localsettings.ts index 5a72a80912f7..9a29b15390d7 100644 --- a/pkg/ui/workspaces/db-console/src/redux/localsettings.ts +++ b/pkg/ui/workspaces/db-console/src/redux/localsettings.ts @@ -27,7 +27,7 @@ import { call, takeEvery } from "redux-saga/effects"; import { PayloadAction } from "src/interfaces/action"; const STORAGE_PREFIX = "cockroachui"; -const SET_UI_VALUE = `${STORAGE_PREFIX}/ui/SET_UI_VALUE`; +export const SET_UI_VALUE = `${STORAGE_PREFIX}/ui/SET_UI_VALUE`; export interface LocalSettingData { key: string; @@ -61,7 +61,7 @@ function saveToSessionStorage(data: LocalSettingData) { * Retrieve local setting value by key from sessionStorage. * Value is stored as a stringified JSON so has to be parsed back. */ -function getValueFromSessionStorage(key: string) { +export function getValueFromSessionStorage(key: string) { const value = sessionStorage.getItem(`${STORAGE_PREFIX}/${key}`); return JSON.parse(value); } diff --git a/pkg/ui/workspaces/db-console/src/redux/sagas.ts b/pkg/ui/workspaces/db-console/src/redux/sagas.ts index 69c43fe9bf0f..6451ffbd884c 100644 --- a/pkg/ui/workspaces/db-console/src/redux/sagas.ts +++ b/pkg/ui/workspaces/db-console/src/redux/sagas.ts @@ -18,6 +18,7 @@ import { analyticsSaga } from "./analyticsSagas"; import { sessionsSaga } from "./sessions"; import { sqlStatsSaga } from "./sqlStats"; import { indexUsageStatsSaga } from "./indexUsageStats"; +import { timeScaleSaga } from "src/redux/timeScale"; export default function* rootSaga() { yield all([ @@ -29,5 +30,6 @@ export default function* rootSaga() { fork(sessionsSaga), fork(sqlStatsSaga), fork(indexUsageStatsSaga), + fork(timeScaleSaga), ]); } diff --git a/pkg/ui/workspaces/db-console/src/redux/timeScale.ts b/pkg/ui/workspaces/db-console/src/redux/timeScale.ts index 7c4e95813a66..8f16c66bb95a 100644 --- a/pkg/ui/workspaces/db-console/src/redux/timeScale.ts +++ b/pkg/ui/workspaces/db-console/src/redux/timeScale.ts @@ -14,18 +14,24 @@ */ import { Action } from "redux"; +import { put, takeEvery } from "redux-saga/effects"; import { PayloadAction } from "src/interfaces/action"; import _ from "lodash"; import { defaultTimeScaleOptions, TimeScale } from "@cockroachlabs/cluster-ui"; import moment from "moment"; import { createSelector } from "reselect"; import { AdminUIState } from "src/redux/state"; +import { + getValueFromSessionStorage, + setLocalSetting, +} from "src/redux/localsettings"; export const SET_SCALE = "cockroachui/timewindow/SET_SCALE"; export const SET_METRICS_MOVING_WINDOW = "cockroachui/timewindow/SET_METRICS_MOVING_WINDOW"; export const SET_METRICS_FIXED_WINDOW = "cockroachui/timewindow/SET_METRICS_FIXED_WINDOW"; +const TIME_SCALE_SESSION_STORAGE_KEY = "time_scale"; /** * TimeWindow represents an absolute window of time, defined with a start and @@ -58,7 +64,15 @@ export class TimeScaleState { }; constructor() { - this.scale = { + let timeScale: TimeScale; + try { + timeScale = getValueFromSessionStorage(TIME_SCALE_SESSION_STORAGE_KEY); + } catch { + console.warn( + `Couldn't retrieve or parse TimeScale options from SessionStorage`, + ); + } + this.scale = timeScale || { ...defaultTimeScaleOptions["Past 10 Minutes"], key: "Past 10 Minutes", fixedWindowEnd: false, @@ -197,3 +211,9 @@ export const adjustTimeScale = ( } return result; }; + +export function* timeScaleSaga() { + yield takeEvery(SET_SCALE, function* ({ payload }: PayloadAction) { + yield put(setLocalSetting(TIME_SCALE_SESSION_STORAGE_KEY, payload)); + }); +} diff --git a/vendor b/vendor index 107b373f7b40..9a2ce8ad3dcb 160000 --- a/vendor +++ b/vendor @@ -1 +1 @@ -Subproject commit 107b373f7b40bf16889147257872b69623430a5d +Subproject commit 9a2ce8ad3dcbca3136d89954f3e95abac1cac5de