From 3d3910b15061844ff809bcb6fa8c0ca52f8fcf95 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Tue, 4 Apr 2023 11:03:07 -0400 Subject: [PATCH 01/11] rpc: remove debug logging This was added recently, in #94778, and contributes to log spam of the following sort: I230404 15:00:33.826337 2400 rpc/context.go:2249 [T1,n1,rnode=2,raddr=127.0.0.1:55941,class=default,rpc] 268 connection heartbeat loop ended with err: I230404 15:00:33.826338 3986 rpc/context.go:2249 [T1,n2,rnode=3,raddr=127.0.0.1:55955,class=system,rpc] 269 connection heartbeat loop ended with err: I230404 15:00:33.826367 3455 rpc/context.go:2249 [T1,n2,rnode=3,raddr=127.0.0.1:55955,class=default,rpc] 270 connection heartbeat loop ended with err: I230404 15:00:33.826394 3354 rpc/context.go:2249 [T1,n2,rnode=2,raddr=127.0.0.1:55941,class=default,rpc] 271 connection heartbeat loop ended with err: Release note: None --- pkg/rpc/context.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index 097cb297c42e..72cea1abf667 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -2298,8 +2298,7 @@ func (rpcCtx *Context) grpcDialNodeInternal( // Run the heartbeat; this will block until the connection breaks for // whatever reason. We don't actually have to do anything with the error, // so we ignore it. - err := rpcCtx.runHeartbeat(ctx, conn, target) - log.Infof(ctx, "connection heartbeat loop ended with err: %v", err) + _ = rpcCtx.runHeartbeat(ctx, conn, target) maybeFatal(ctx, rpcCtx.m.Remove(k, conn)) // Context gets canceled on server shutdown, and if that's likely why From de709fab1eb5409883afa8ea3f8e411afb161ece Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Tue, 9 May 2023 12:37:47 -0700 Subject: [PATCH 02/11] workload: introduce timeout for pre-warming connection pool Interrupting target instances during prewarming shouldn't cause workload to proceed: introduce a timeout to prewarming connections. Connections will have 15s to 5min to warmup before the context will expire. Epic: none Release note: None --- pkg/workload/pgx_helpers.go | 87 ++++++++++++++++++++++--------------- 1 file changed, 51 insertions(+), 36 deletions(-) diff --git a/pkg/workload/pgx_helpers.go b/pkg/workload/pgx_helpers.go index 16d04725e7ee..b913d1562145 100644 --- a/pkg/workload/pgx_helpers.go +++ b/pkg/workload/pgx_helpers.go @@ -252,18 +252,15 @@ func (m *MultiConnPool) Method() pgx.QueryExecMode { return m.method } -// WarmupConns warms up numConns connections across all pools contained within -// MultiConnPool. The max number of connections are warmed up if numConns is -// set to 0. -func (m *MultiConnPool) WarmupConns(ctx context.Context, numConns int) error { - if numConns < 0 { +// WarmupConns warms up totalNumConns connections distributed across all pools +// contained within MultiConnPool. The max number of connections are warmed up +// if totalNumConns is set to 0. If totalNumConns is less than 0, no +// pre-warming of connections is performed. +func (m *MultiConnPool) WarmupConns(ctx context.Context, totalNumConns int) error { + if totalNumConns < 0 { return nil } - // NOTE(seanc@): see context cancellation note below. - warmupCtx, cancel := context.WithCancel(ctx) - defer cancel() - // "Warm up" the pools so we don't have to establish connections later (which // would affect the observed latencies of the first requests, especially when // prepared statements are used). We do this by @@ -279,32 +276,57 @@ func (m *MultiConnPool) WarmupConns(ctx context.Context, numConns int) error { // (128). g.SetLimit(100) - var warmupConnsPerPool []int - if numConns == 0 { - warmupConnsPerPool = make([]int, len(m.Pools)) - for i, p := range m.Pools { - warmupConnsPerPool[i] = int(p.Config().MaxConns) + type warmupPool struct { + maxConns int + pool *pgxpool.Pool + } + + warmupPools := make([]warmupPool, len(m.Pools)) + var numWarmupConns int + numConnsPerPool := distribute(totalNumConns, len(m.Pools)) + for i, p := range m.Pools { + poolMaxConns := int(p.Config().MaxConns) + + // Tune max conns for the pool + switch { + case totalNumConns == 0 && poolMaxConns > 0: + warmupPools[i].maxConns = poolMaxConns + case totalNumConns == 0: + warmupPools[i].maxConns = 1 // always at least one connection + default: + warmupPools[i].maxConns = numConnsPerPool[i] } - } else { - warmupConnsPerPool = distribute(numConns, len(m.Pools)) - for i, p := range m.Pools { - poolMaxConns := int(p.Config().MaxConns) - if warmupConnsPerPool[i] > poolMaxConns { - warmupConnsPerPool[i] = poolMaxConns - } + + // Clamp max conns per pool + if warmupPools[i].maxConns > poolMaxConns { + warmupPools[i].maxConns = poolMaxConns } + + warmupPools[i].pool = p + numWarmupConns += warmupPools[i].maxConns } - var numWarmupConns int - for _, n := range warmupConnsPerPool { - numWarmupConns += n + // NOTE(seanc@): see context cancellation note below. + // TODO(seanc@): Change WithTimeout() back to WithCancel() + const maxWarmupTime = 5 * time.Minute // NOTE(seanc@): 5min == AWS NLB TCP idle time + const minWarmupTime = 15 * time.Second + const maxTimePerConn = 200 * time.Millisecond + warmupTime := minWarmupTime + if int(warmupTime) < numWarmupConns*int(maxTimePerConn) { + warmupTime = time.Duration(numWarmupConns * int(maxTimePerConn)) } + if warmupTime > maxWarmupTime { + warmupTime = maxWarmupTime + } + ctx, cancel := context.WithTimeout(ctx, warmupTime) + defer cancel() + warmupConns := make(chan *pgxpool.Conn, numWarmupConns) - for i, p := range m.Pools { + for _, p := range warmupPools { p := p - for k := 0; k < warmupConnsPerPool[i]; k++ { + for i := 0; i < p.maxConns; i++ { g.Go(func() error { - conn, err := p.Acquire(warmupCtx) + conn, err := p.pool.Acquire(ctx) if err != nil { return err } @@ -317,23 +339,16 @@ func (m *MultiConnPool) WarmupConns(ctx context.Context, numConns int) error { estConns := make([]*pgxpool.Conn, 0, numWarmupConns) defer func() { for _, conn := range estConns { - // NOTE(seanc@): Release() connections before canceling the warmupCtx to - // prevent partially established connections from being Acquire()'ed. conn.Release() } }() -WARMUP: for i := 0; i < numWarmupConns; i++ { select { case conn := <-warmupConns: estConns = append(estConns, conn) - case <-warmupCtx.Done(): - if err := warmupCtx.Err(); err != nil { - return err - } - - break WARMUP + case <-ctx.Done(): + return ctx.Err() } } From 9f7143312ceff9b17ac77053a643677160a4fd3f Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Tue, 9 May 2023 12:55:26 -0700 Subject: [PATCH 03/11] workload: log a warning instead of erroring when warming up connections Epic: none Release note: None Fixes: #102687 --- pkg/workload/pgx_helpers.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/workload/pgx_helpers.go b/pkg/workload/pgx_helpers.go index b913d1562145..5bbff6266a08 100644 --- a/pkg/workload/pgx_helpers.go +++ b/pkg/workload/pgx_helpers.go @@ -214,7 +214,7 @@ func NewMultiConnPool( } if err := m.WarmupConns(ctx, cfg.WarmupConns); err != nil { - return nil, err + log.Warningf(ctx, "warming up connection pool failed (%v), continuing workload", err) } return m, nil From d16c0114baf1cf5dc3481ae8a536bdd1f3742bfd Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Tue, 9 May 2023 16:54:02 -0400 Subject: [PATCH 04/11] workload: add --scatter flag to kv workload The user can now run `./workload init kv --scatter ....` which scatters the kv table across the cluster after the initial data load. This flag is best used with `--splits`, `--max-block-bytes`, and `--insert-count`. Epic: none Release note: none --- pkg/workload/kv/kv.go | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/pkg/workload/kv/kv.go b/pkg/workload/kv/kv.go index 84007fba2733..3eec400c98c0 100644 --- a/pkg/workload/kv/kv.go +++ b/pkg/workload/kv/kv.go @@ -88,6 +88,7 @@ type kv struct { zipfian bool sfuDelay time.Duration splits int + scatter bool secondaryIndex bool shards int targetCompressionRatio float64 @@ -127,6 +128,7 @@ var kvMeta = workload.Meta{ `span-limit`: {RuntimeOnly: true}, `del-percent`: {RuntimeOnly: true}, `splits`: {RuntimeOnly: true}, + `scatter`: {RuntimeOnly: true}, `timeout`: {RuntimeOnly: true}, } g.flags.IntVar(&g.batchSize, `batch`, 1, @@ -157,6 +159,8 @@ var kvMeta = workload.Meta{ `previous --sequential run and R implies a previous random run.`) g.flags.IntVar(&g.splits, `splits`, 0, `Number of splits to perform before starting normal operations.`) + g.flags.BoolVar(&g.scatter, `scatter`, false, + `Scatter ranges before starting normal operations.`) g.flags.BoolVar(&g.secondaryIndex, `secondary-index`, false, `Add a secondary index to the schema.`) g.flags.IntVar(&g.shards, `num-shards`, 0, @@ -189,13 +193,20 @@ func (w *kv) Flags() workload.Flags { return w.flags } func (w *kv) Hooks() workload.Hooks { return workload.Hooks{ PostLoad: func(_ context.Context, db *gosql.DB) error { - if !w.enum { - return nil - } - _, err := db.Exec(` + if w.enum { + _, err := db.Exec(` CREATE TYPE enum_type AS ENUM ('v'); ALTER TABLE kv ADD COLUMN e enum_type NOT NULL AS ('v') STORED;`) - return err + if err != nil { + return err + } + } + if w.scatter { + if _, err := db.Exec(`ALTER TABLE kv SCATTER`); err != nil { + return err + } + } + return nil }, Validate: w.validateConfig, } From 91d80223468f7f62f3144034b523aa25086bb29f Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Mon, 8 May 2023 17:15:12 -0400 Subject: [PATCH 05/11] c2c: rename completeStreamIngestion to applyCutoverTime Release note: none --- pkg/ccl/streamingccl/streamingest/alter_replication_job.go | 2 +- pkg/ccl/streamingccl/streamingest/stream_ingest_manager.go | 2 +- pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go | 5 +++-- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/pkg/ccl/streamingccl/streamingest/alter_replication_job.go b/pkg/ccl/streamingccl/streamingest/alter_replication_job.go index dbb3d46cedf0..7879ce03f36d 100644 --- a/pkg/ccl/streamingccl/streamingest/alter_replication_job.go +++ b/pkg/ccl/streamingccl/streamingest/alter_replication_job.go @@ -264,7 +264,7 @@ func alterTenantJobCutover( cutoverTime, record.Timestamp) } } - if err := completeStreamIngestion(ctx, jobRegistry, txn, tenInfo.TenantReplicationJobID, cutoverTime); err != nil { + if err := applyCutoverTime(ctx, jobRegistry, txn, tenInfo.TenantReplicationJobID, cutoverTime); err != nil { return hlc.Timestamp{}, err } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingest_manager.go b/pkg/ccl/streamingccl/streamingest/stream_ingest_manager.go index 4755b67c79af..dccdd0d10932 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingest_manager.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingest_manager.go @@ -37,7 +37,7 @@ type streamIngestManagerImpl struct { func (r *streamIngestManagerImpl) CompleteStreamIngestion( ctx context.Context, ingestionJobID jobspb.JobID, cutoverTimestamp hlc.Timestamp, ) error { - return completeStreamIngestion(ctx, r.jobRegistry, r.txn, ingestionJobID, cutoverTimestamp) + return applyCutoverTime(ctx, r.jobRegistry, r.txn, ingestionJobID, cutoverTimestamp) } // GetStreamIngestionStats implements streaming.StreamIngestManager interface. diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go index 22d85985ecf2..11417b83ccc5 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go @@ -38,8 +38,9 @@ import ( "github.com/cockroachdb/errors" ) -// completeStreamIngestion terminates the stream as of specified time. -func completeStreamIngestion( +// applyCutoverTime modifies the consumer job record with a cutover time and +// unpauses the job if necessary. +func applyCutoverTime( ctx context.Context, jobRegistry *jobs.Registry, txn isql.Txn, From dcab435bba9853e8dff53816e2e6535ed1b4a60f Mon Sep 17 00:00:00 2001 From: healthy-pod Date: Thu, 11 May 2023 08:30:27 -0700 Subject: [PATCH 06/11] generate-logic-test: fix incorrect timeout in logictests template In #102719, we changed the way we set `-test.timeout` but didn't update the logictests template. This code change updates the template. Release note: None Epic: none --- pkg/cmd/generate-logictest/templates.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/cmd/generate-logictest/templates.go b/pkg/cmd/generate-logictest/templates.go index e1e6d150b5cf..2178003399ca 100644 --- a/pkg/cmd/generate-logictest/templates.go +++ b/pkg/cmd/generate-logictest/templates.go @@ -257,7 +257,10 @@ go_test( size = "enormous", srcs = ["generated_test.go"],{{ if .SqliteLogicTest }} args = ["-test.timeout=7195s"],{{ else }} - args = ["-test.timeout=3595s"],{{ end }} + args = select({ + "//build/toolchains:use_ci_timeouts": ["-test.timeout=895s"], + "//conditions:default": ["-test.timeout=3595s"], + }),{{ end }} data = [ "//c-deps:libgeos", # keep{{ if .SqliteLogicTest }} "@com_github_cockroachdb_sqllogictest//:testfiles", # keep{{ end }}{{ if .CockroachGoTestserverTest }} From 093e2ddee34bac6492d7435e6e721c664c0d1785 Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Wed, 26 Apr 2023 17:57:08 -0400 Subject: [PATCH 07/11] c2c: deflake c2c/shutdown roachtests This patch addresses to roachtest failure modes: - Prevents roachtest failure if a query fails during a node shutdown. - Prevents the src cluster from returning a single node topology, which could cause the stream ingestion job to hang if the participating src node gets shut down. Longer term, automatic replanning will prevent this. Specifically, this patch changes the kv workload to split and scatter the kv table across the cluster before the c2c job begins. Fixes #101898 Fixes #102111 This patch also makes it easier to reproduce c2c roachtest failures by plumbing a random seed to several components of the roachtest driver. Release note: None --- pkg/cmd/roachtest/option/node_list_option.go | 5 + pkg/cmd/roachtest/tests/cluster_to_cluster.go | 138 +++++++++++------- pkg/cmd/roachtest/tests/multitenant_utils.go | 2 + 3 files changed, 95 insertions(+), 50 deletions(-) diff --git a/pkg/cmd/roachtest/option/node_list_option.go b/pkg/cmd/roachtest/option/node_list_option.go index 475d78d2c956..5535a7eea2dd 100644 --- a/pkg/cmd/roachtest/option/node_list_option.go +++ b/pkg/cmd/roachtest/option/node_list_option.go @@ -73,6 +73,11 @@ func (n NodeListOption) RandNode() NodeListOption { return NodeListOption{n[rand.Intn(len(n))]} } +// SeededRandNode returns a random node from the NodeListOption using a seeded rand object. +func (n NodeListOption) SeededRandNode(rand *rand.Rand) NodeListOption { + return NodeListOption{n[rand.Intn(len(n))]} +} + // NodeIDsString returns the nodes in the NodeListOption, separated by spaces. func (n NodeListOption) NodeIDsString() string { result := "" diff --git a/pkg/cmd/roachtest/tests/cluster_to_cluster.go b/pkg/cmd/roachtest/tests/cluster_to_cluster.go index f8eea8294209..e87a73bbdf45 100644 --- a/pkg/cmd/roachtest/tests/cluster_to_cluster.go +++ b/pkg/cmd/roachtest/tests/cluster_to_cluster.go @@ -34,11 +34,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/multitenant/mtinfopb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/roachprod/install" + "github.com/cockroachdb/cockroach/pkg/roachprod/logger" "github.com/cockroachdb/cockroach/pkg/roachprod/prometheus" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" @@ -228,33 +230,33 @@ type replicateKV struct { readPercent int // This field is merely used to debug the c2c framework for finite workloads. - debugRunDurationMinutes int + debugRunDuration time.Duration - // initDuration, if nonzero, will pre-populate the src cluster - initDurationMinutes int + // the number of rows inserted into the cluster before c2c begins + initRows int + + // max size of raw data written during each insertion + maxBlockBytes int } func (kv replicateKV) sourceInitCmd(tenantName string, nodes option.NodeListOption) string { - if kv.initDurationMinutes == 0 { - return "" - } - return fmt.Sprintf(`./workload run kv --tolerate-errors --init --duration %dm --read-percent 0 {pgurl%s:%s}`, - kv.initDurationMinutes, - nodes, - tenantName) + cmd := roachtestutil.NewCommand(`./workload init kv`). + MaybeFlag(kv.initRows > 0, "insert-count", kv.initRows). + MaybeFlag(kv.initRows > 0, "max-block-bytes", kv.maxBlockBytes). + Flag("splits", 100). + Option("scatter"). + Arg("{pgurl%s:%s}", nodes, tenantName) + return cmd.String() } func (kv replicateKV) sourceRunCmd(tenantName string, nodes option.NodeListOption) string { - debugDuration := "" - if kv.debugRunDurationMinutes != 0 { - debugDuration = fmt.Sprintf("--duration %dm", kv.debugRunDurationMinutes) - } - // added --tolerate-errors flags to prevent test from flaking due to a transaction retry error - return fmt.Sprintf(`./workload run kv --tolerate-errors --init %s --read-percent %d {pgurl%s:%s}`, - debugDuration, - kv.readPercent, - nodes, - tenantName) + cmd := roachtestutil.NewCommand(`./workload run kv`). + Option("tolerate-errors"). + Flag("max-block-bytes", kv.maxBlockBytes). + Flag("read-percent", kv.readPercent). + MaybeFlag(kv.debugRunDuration > 0, "duration", kv.debugRunDuration). + Arg("{pgurl%s:%s}", nodes, tenantName) + return cmd.String() } func (kv replicateKV) runDriver( @@ -338,13 +340,17 @@ type replicationDriver struct { t test.Test c cluster.Cluster metrics *c2cMetrics + rng *rand.Rand } func makeReplicationDriver(t test.Test, c cluster.Cluster, rs replicationSpec) replicationDriver { + rng, seed := randutil.NewTestRand() + t.L().Printf(`Random Seed is %d`, seed) return replicationDriver{ - t: t, - c: c, - rs: rs, + t: t, + c: c, + rs: rs, + rng: rng, } } @@ -367,8 +373,8 @@ func (rd *replicationDriver) setupC2C(ctx context.Context, t test.Test, c cluste dstClusterSetting := install.MakeClusterSettings(install.SecureOption(true)) c.Start(ctx, t.L(), dstStartOps, dstClusterSetting, dstCluster) - srcNode := srcCluster.RandNode() - destNode := dstCluster.RandNode() + srcNode := srcCluster.SeededRandNode(rd.rng) + destNode := dstCluster.SeededRandNode(rd.rng) addr, err := c.ExternalPGUrl(ctx, t.L(), srcNode, "") require.NoError(t, err) @@ -428,7 +434,12 @@ func (rd *replicationDriver) setupC2C(ctx context.Context, t test.Test, c cluste WithNodeExporter(rd.setup.dst.nodes.InstallNodes()). WithGrafanaDashboard("https://go.crdb.dev/p/changefeed-roachtest-grafana-dashboard") - require.NoError(rd.t, rd.c.StartGrafana(ctx, rd.t.L(), rd.setup.promCfg)) + // StartGrafana clutters the test.log. Try logging setup to a separate file. + promLog, err := rd.t.L().ChildLogger("prom_setup", logger.QuietStderr, logger.QuietStdout) + if err != nil { + promLog = rd.t.L() + } + require.NoError(rd.t, rd.c.StartGrafana(ctx, promLog, rd.setup.promCfg)) rd.t.L().Printf("Prom has started") } } @@ -527,13 +538,20 @@ func (rd *replicationDriver) getReplicationRetainedTime() time.Time { return retainedTime } -func (rd *replicationDriver) stopReplicationStream(ingestionJob int, cutoverTime time.Time) { +func (rd *replicationDriver) stopReplicationStream( + ctx context.Context, ingestionJob int, cutoverTime time.Time, +) { rd.setup.dst.sysSQL.Exec(rd.t, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`, rd.setup.dst.name, cutoverTime) err := retry.ForDuration(time.Minute*5, func() error { var status string var payloadBytes []byte - rd.setup.dst.sysSQL.QueryRow(rd.t, `SELECT status, payload FROM crdb_internal.system_jobs WHERE id = $1`, - ingestionJob).Scan(&status, &payloadBytes) + res := rd.setup.dst.sysSQL.DB.QueryRowContext(ctx, `SELECT status, payload FROM crdb_internal.system_jobs WHERE id = $1`, ingestionJob) + if res.Err() != nil { + // This query can fail if a node shuts down during the query execution; + // therefore, tolerate errors. + return res.Err() + } + require.NoError(rd.t, res.Scan(&status, &payloadBytes)) if jobs.Status(status) == jobs.StatusFailed { payload := &jobspb.Payload{} if err := protoutil.Unmarshal(payloadBytes, payload); err == nil { @@ -593,6 +611,13 @@ func (rd *replicationDriver) main(ctx context.Context) { metricSnapper := rd.startStatsCollection(ctx) rd.preStreamingWorkload(ctx) + // Wait for initial workload to be properly replicated across the source cluster to increase + // the probability that the producer returns a topology with more than one node in it, + // else the node shutdown tests can flake. + if rd.rs.srcNodes >= 3 { + require.NoError(rd.t, WaitFor3XReplication(ctx, rd.t, rd.setup.src.db)) + } + rd.t.L().Printf("begin workload on src cluster") m := rd.newMonitor(ctx) // The roachtest driver can use the workloadCtx to cancel the workload. @@ -617,10 +642,12 @@ func (rd *replicationDriver) main(ctx context.Context) { rd.t.Status("starting replication stream") rd.metrics.initalScanStart = newMetricSnapshot(metricSnapper, timeutil.Now()) ingestionJobID := rd.startReplicationStream(ctx) - removeTenantRateLimiters(rd.t, rd.setup.dst.sysSQL, rd.setup.dst.name) - lv := makeLatencyVerifier("stream-ingestion", 0, 2*time.Minute, rd.t.L(), getStreamIngestionJobInfo, rd.t.Status, false) + // latency verifier queries may error during a node shutdown event; therefore + // tolerate errors if we anticipate node deaths. + lv := makeLatencyVerifier("stream-ingestion", 0, 2*time.Minute, rd.t.L(), + getStreamIngestionJobInfo, rd.t.Status, rd.rs.expectedNodeDeaths > 0) defer lv.maybeLogLatencyHist() m.Go(func(ctx context.Context) error { @@ -655,7 +682,7 @@ func (rd *replicationDriver) main(ctx context.Context) { rd.t.Status(fmt.Sprintf("waiting for replication stream to cutover to %s", cutoverTime.String())) - rd.stopReplicationStream(ingestionJobID, cutoverTime) + rd.stopReplicationStream(ctx, ingestionJobID, cutoverTime) rd.metrics.cutoverEnd = newMetricSnapshot(metricSnapper, timeutil.Now()) rd.metrics.export(rd.t, len(rd.setup.src.nodes)) @@ -704,7 +731,7 @@ func runAcceptanceClusterReplication(ctx context.Context, t test.Test, c cluster dstNodes: 1, // The timeout field ensures the c2c roachtest driver behaves properly. timeout: 10 * time.Minute, - workload: replicateKV{readPercent: 0, debugRunDurationMinutes: 1}, + workload: replicateKV{readPercent: 0, debugRunDuration: 1 * time.Minute, maxBlockBytes: 1}, additionalDuration: 0 * time.Minute, cutover: 30 * time.Second, } @@ -751,19 +778,20 @@ func registerClusterToCluster(r registry.Registry) { dstNodes: 3, cpus: 8, pdSize: 100, - workload: replicateKV{readPercent: 0}, + workload: replicateKV{readPercent: 0, maxBlockBytes: 1024}, timeout: 1 * time.Hour, additionalDuration: 10 * time.Minute, cutover: 5 * time.Minute, tags: registry.Tags("aws"), }, { - name: "c2c/UnitTest", - srcNodes: 1, - dstNodes: 1, - cpus: 4, - pdSize: 10, - workload: replicateKV{readPercent: 0, debugRunDurationMinutes: 1}, + name: "c2c/UnitTest", + srcNodes: 1, + dstNodes: 1, + cpus: 4, + pdSize: 10, + workload: replicateKV{readPercent: 0, debugRunDuration: 1 * time.Minute, + maxBlockBytes: 1024}, timeout: 5 * time.Minute, additionalDuration: 0 * time.Minute, cutover: 30 * time.Second, @@ -868,7 +896,7 @@ func makeReplResilienceDriver( rd := makeReplicationDriver(t, c, rsp.replicationSpec) return replResilienceDriver{ replicationDriver: rd, - phase: c2cPhase(rand.Intn(int(phaseCutover) + 1)), + phase: c2cPhase(rd.rng.Intn(int(phaseCutover) + 1)), rsp: rsp, } } @@ -923,7 +951,7 @@ func (rrd *replResilienceDriver) getTargetAndWatcherNodes(ctx context.Context) { findAnotherNode := func(notThisNode int) int { for { - anotherNode := nodes.RandNode()[0] + anotherNode := nodes.SeededRandNode(rrd.rng)[0] if notThisNode != anotherNode { return anotherNode } @@ -960,12 +988,7 @@ func (rrd *replResilienceDriver) waitForTargetPhase() error { case currentPhase < rrd.phase: time.Sleep(5 * time.Second) case currentPhase == rrd.phase: - // Every C2C phase should last at least 30 seconds, so introduce a little - // bit of random waiting before node shutdown to ensure the shutdown occurs - // once we're settled into the target phase. - randomSleep := time.Duration(rand.Intn(6)) - rrd.t.L().Printf("In target phase! Take a %d second power nap", randomSleep) - time.Sleep(randomSleep * time.Second) + rrd.t.L().Printf("In target phase %s", currentPhase.String()) return nil default: return errors.New("c2c job past target phase") @@ -973,6 +996,16 @@ func (rrd *replResilienceDriver) waitForTargetPhase() error { } } +func (rrd *replResilienceDriver) sleepBeforeResiliencyEvent() { + // Assuming every C2C phase lasts at least 10 seconds, introduce some waiting + // before a resiliency event (e.g. a node shutdown) to ensure the event occurs + // once we're fully settled into the target phase (e.g. the stream ingestion + // processors have observed the cutover signal). + randomSleep := time.Duration(5+rrd.rng.Intn(6)) * time.Second + rrd.t.L().Printf("Take a %s power nap", randomSleep) + time.Sleep(randomSleep) +} + func registerClusterReplicationResilience(r registry.Registry) { for _, rsp := range []replResilienceSpec{ { @@ -999,7 +1032,7 @@ func registerClusterReplicationResilience(r registry.Registry) { srcNodes: 4, dstNodes: 4, cpus: 8, - workload: replicateKV{readPercent: 0, initDurationMinutes: 2}, + workload: replicateKV{readPercent: 0, initRows: 1000000, maxBlockBytes: 1024}, timeout: 20 * time.Minute, additionalDuration: 6 * time.Minute, cutover: 3 * time.Minute, @@ -1054,6 +1087,9 @@ func registerClusterReplicationResilience(r registry.Registry) { // Don't begin shutdown process until c2c job is set up. <-shutdownSetupDone + // Eagerly listen to cutover signal to exercise node shutdown during actual cutover. + rrd.setup.dst.sysSQL.Exec(t, `SET CLUSTER SETTING bulkio.stream_ingestion.cutover_signal_poll_interval='5s'`) + // While executing a node shutdown on either the src or destination // cluster, ensure the destination cluster's stream ingestion job // completes. If the stream producer job fails, no big deal-- in a real @@ -1061,7 +1097,9 @@ func registerClusterReplicationResilience(r registry.Registry) { // successful c2c replication execution. shutdownStarter := func() jobStarter { return func(c cluster.Cluster, t test.Test) (string, error) { - return fmt.Sprintf("%d", rrd.dstJobID), rrd.waitForTargetPhase() + require.NoError(t, rrd.waitForTargetPhase()) + rrd.sleepBeforeResiliencyEvent() + return fmt.Sprintf("%d", rrd.dstJobID), nil } } destinationWatcherNode := rrd.watcherNode diff --git a/pkg/cmd/roachtest/tests/multitenant_utils.go b/pkg/cmd/roachtest/tests/multitenant_utils.go index 2ca12644bcbe..7c5df2dc16a3 100644 --- a/pkg/cmd/roachtest/tests/multitenant_utils.go +++ b/pkg/cmd/roachtest/tests/multitenant_utils.go @@ -338,6 +338,8 @@ func createInMemoryTenant( sysSQL.Exec(t, "CREATE TENANT $1", tenantName) sysSQL.Exec(t, "ALTER TENANT $1 START SERVICE SHARED", tenantName) sysSQL.Exec(t, `ALTER TENANT $1 GRANT CAPABILITY can_view_node_info=true, can_admin_split=true,can_view_tsdb_metrics=true`, tenantName) + sysSQL.Exec(t, `ALTER TENANT $1 SET CLUSTER SETTING sql.split_at.allow_for_secondary_tenant.enabled=true`, tenantName) + sysSQL.Exec(t, `ALTER TENANT $1 SET CLUSTER SETTING sql.scatter.allow_for_secondary_tenant.enabled=true`, tenantName) removeTenantRateLimiters(t, sysSQL, tenantName) From 58858834599b2d9b874a5777bb5ec20155bc22af Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Fri, 21 Apr 2023 11:42:36 +0200 Subject: [PATCH 08/11] cli/sql: print error details upon `\c` failures Prior to this patch, only the error message string was printed if `\c` fails. This patch ensures the hints/details are also printed. Release note: None --- pkg/cli/clisqlshell/sql.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/cli/clisqlshell/sql.go b/pkg/cli/clisqlshell/sql.go index a46c68cabf54..3715ebd836f1 100644 --- a/pkg/cli/clisqlshell/sql.go +++ b/pkg/cli/clisqlshell/sql.go @@ -1629,7 +1629,7 @@ func (c *cliState) handleConnect( cmd []string, loopState, errState cliStateEnum, ) (resState cliStateEnum) { if err := c.handleConnectInternal(cmd, false /*omitConnString*/); err != nil { - fmt.Fprintln(c.iCtx.stderr, err) + clierror.OutputError(c.iCtx.stderr, err, true /*showSeverity*/, false /*verbose*/) c.exitErr = err return errState } From dedeacfc30e302f760e7d6d89aa7820de42a7387 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Fri, 21 Apr 2023 11:50:55 +0200 Subject: [PATCH 09/11] cli/sql: new option `autocerts` for TLS client cert auto-discovery See the release note below. An additional benefit not mentioned in the release note is that it simplifies switching from one tenant to another when using shared-process multitenancy. For example, this becomes possible: ``` > CREATE TENANT foo; > ALTER TENANT foo START SERVICE SHARED; > \c cluster:foo root - - autocerts ``` Alternatively, this can also be used to quickly switch from a non-root user in an app tenant to the root user in the system tenant: ``` > \c cluster:system root - - autocerts ``` This works because (currently) all tenant servers running side-by-side use the same TLS CA to validate SQL client certs. Release note (cli change): The `\connect` client-side command for the SQL shell (included in `cockroach sql`, `cockroach demo`, `cockroach-sql`) now recognizes an option `autocerts` as last argument. When provided, `\c` will now try to discover a TLS client certificate and key in the same directory(ies) as used by the previous connection URL. This feature makes it easier to switch usernames when TLS client/key files are available for both the previous and the new username. --- pkg/cli/clisqlshell/BUILD.bazel | 1 + pkg/cli/clisqlshell/context.go | 4 + pkg/cli/clisqlshell/sql.go | 122 +++++++++++++++++- pkg/cli/demo.go | 2 + .../interactive_tests/test_connect_cmd.tcl | 18 ++- pkg/cli/sql_shell_cmd.go | 1 + pkg/cmd/cockroach-sql/main.go | 1 + 7 files changed, 144 insertions(+), 5 deletions(-) diff --git a/pkg/cli/clisqlshell/BUILD.bazel b/pkg/cli/clisqlshell/BUILD.bazel index f5cceb24007f..2da958ea419b 100644 --- a/pkg/cli/clisqlshell/BUILD.bazel +++ b/pkg/cli/clisqlshell/BUILD.bazel @@ -41,6 +41,7 @@ go_library( "//pkg/util/sysutil", "@com_github_charmbracelet_bubbles//cursor", "@com_github_cockroachdb_errors//:errors", + "@com_github_cockroachdb_errors//oserror", "@com_github_knz_bubbline//:bubbline", "@com_github_knz_bubbline//computil", "@com_github_knz_bubbline//editline", diff --git a/pkg/cli/clisqlshell/context.go b/pkg/cli/clisqlshell/context.go index a69ab572e63a..4f1662c72981 100644 --- a/pkg/cli/clisqlshell/context.go +++ b/pkg/cli/clisqlshell/context.go @@ -53,6 +53,10 @@ type Context struct { // CockroachDB's own CLI package has a more advanced URL // parser that is used instead. ParseURL URLParser + + // CertsDir is an extra directory to look for client certs in, + // when the \c command is used. + CertsDir string } // internalContext represents the internal configuration state of the diff --git a/pkg/cli/clisqlshell/sql.go b/pkg/cli/clisqlshell/sql.go index 3715ebd836f1..9e6d7de47dc9 100644 --- a/pkg/cli/clisqlshell/sql.go +++ b/pkg/cli/clisqlshell/sql.go @@ -42,6 +42,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/sysutil" "github.com/cockroachdb/errors" + "github.com/cockroachdb/errors/oserror" "github.com/knz/bubbline/editline" "github.com/knz/bubbline/history" ) @@ -66,9 +67,10 @@ Query Buffer Connection \info display server details including connection strings. - \c, \connect {[DB] [USER] [HOST] [PORT] | [URL]} + \c, \connect {[DB] [USER] [HOST] [PORT] [options] | [URL]} connect to a server or print the current connection URL. - (Omitted values reuse previous parameters. Use '-' to skip a field.) + Omitted values reuse previous parameters. Use '-' to skip a field. + The option "autocerts" attempts to auto-discover TLS client certs. \password [USERNAME] securely change the password for a user @@ -1691,10 +1693,21 @@ func (c *cliState) handleConnectInternal(cmd []string, omitConnString bool) erro return err } + var autoCerts bool + // Parse the arguments to \connect: - // it accepts newdb, user, host, port in that order. + // it accepts newdb, user, host, port and options in that order. // Each field can be marked as "-" to reuse the current defaults. switch len(cmd) { + case 5: + if cmd[4] != "-" { + if cmd[4] == "autocerts" { + autoCerts = true + } else { + return errors.Newf(`unknown syntax: \c %s`, strings.Join(cmd, " ")) + } + } + fallthrough case 4: if cmd[3] != "-" { if err := newURL.SetOption("port", cmd[3]); err != nil { @@ -1757,6 +1770,12 @@ func (c *cliState) handleConnectInternal(cmd []string, omitConnString bool) erro newURL.WithAuthn(prevAuthn) } + if autoCerts { + if err := autoFillClientCerts(newURL, currURL, c.sqlCtx.CertsDir); err != nil { + return err + } + } + if err := newURL.Validate(); err != nil { return errors.Wrap(err, "validating the new URL") } @@ -2712,3 +2731,100 @@ func (c *cliState) closeOutputFile() error { c.iCtx.queryOutputBuf = nil return err } + +// autoFillClientCerts tries to discover a TLS client certificate and key +// for use in newURL. This is used from the \connect command with option +// "autocerts". +func autoFillClientCerts(newURL, currURL *pgurl.URL, extraCertsDir string) error { + username := newURL.GetUsername() + // We could use methods from package "certnames" here but we're + // avoiding extra package dependencies for the sake of keeping + // the standalone shell binary (cockroach-sql) small. + desiredKeyFile := "client." + username + ".key" + desiredCertFile := "client." + username + ".crt" + // Try to discover a TLS client certificate and key. + // First we'll try to find them in the directory specified in the shell config. + // This is coming from --certs-dir on the command line (of COCKROACH_CERTS_DIR). + // + // If not found there, we'll try to find the client cert in the + // same directory as the cert in the original URL; and the key in + // the same directory as the key in the original URL (cert and key + // may be in different places). + // + // If the original URL doesn't have a cert, we'll try to find a + // cert in the directory where the CA cert is stored. + + // If all fails, we'll tell the user where we tried to search. + candidateDirs := make(map[string]struct{}) + var newCert, newKey string + if extraCertsDir != "" { + candidateDirs[extraCertsDir] = struct{}{} + if candidateCert := filepath.Join(extraCertsDir, desiredCertFile); fileExists(candidateCert) { + newCert = candidateCert + } + if candidateKey := filepath.Join(extraCertsDir, desiredKeyFile); fileExists(candidateKey) { + newKey = candidateKey + } + } + if newCert == "" || newKey == "" { + var caCertDir string + if tlsUsed, _, caCertPath := currURL.GetTLSOptions(); tlsUsed { + caCertDir = filepath.Dir(caCertPath) + candidateDirs[caCertDir] = struct{}{} + } + var prevCertDir, prevKeyDir string + if authnCertEnabled, certPath, keyPath := currURL.GetAuthnCert(); authnCertEnabled { + prevCertDir = filepath.Dir(certPath) + prevKeyDir = filepath.Dir(keyPath) + candidateDirs[prevCertDir] = struct{}{} + candidateDirs[prevKeyDir] = struct{}{} + } + if newKey == "" { + if candidateKey := filepath.Join(prevKeyDir, desiredKeyFile); fileExists(candidateKey) { + newKey = candidateKey + } else if candidateKey := filepath.Join(caCertDir, desiredKeyFile); fileExists(candidateKey) { + newKey = candidateKey + } + } + if newCert == "" { + if candidateCert := filepath.Join(prevCertDir, desiredCertFile); fileExists(candidateCert) { + newCert = candidateCert + } else if candidateCert := filepath.Join(caCertDir, desiredCertFile); fileExists(candidateCert) { + newCert = candidateCert + } + } + } + if newCert == "" || newKey == "" { + err := errors.Newf("unable to find TLS client cert and key for user %q", username) + if len(candidateDirs) == 0 { + err = errors.WithHint(err, "No candidate directories; try specifying --certs-dir on the command line.") + } else { + sortedDirs := make([]string, 0, len(candidateDirs)) + for dir := range candidateDirs { + sortedDirs = append(sortedDirs, dir) + } + sort.Strings(sortedDirs) + err = errors.WithDetailf(err, "Candidate directories:\n- %s", strings.Join(sortedDirs, "\n- ")) + } + return err + } + + newURL.WithAuthn(pgurl.AuthnClientCert(newCert, newKey)) + + return nil +} + +func fileExists(path string) bool { + _, err := os.Stat(path) + if err == nil { + return true + } + if oserror.IsNotExist(err) { + return false + } + // Stat() returned an error that is not "does not exist". + // This is unexpected, but we'll treat it as if the file does exist. + // The connection will try to use the file, and then fail with a + // more specific error. + return true +} diff --git a/pkg/cli/demo.go b/pkg/cli/demo.go index 93b2e8c81f88..db6e95a1fd2f 100644 --- a/pkg/cli/demo.go +++ b/pkg/cli/demo.go @@ -355,6 +355,8 @@ func runDemoInternal( } defer func() { resErr = errors.CombineErrors(resErr, conn.Close()) }() + _, _, certsDir := c.GetSQLCredentials() + sqlCtx.ShellCtx.CertsDir = certsDir sqlCtx.ShellCtx.ParseURL = clienturl.MakeURLParserFn(cmd, cliCtx.clientOpts) if err := extraInit(ctx, conn); err != nil { diff --git a/pkg/cli/interactive_tests/test_connect_cmd.tcl b/pkg/cli/interactive_tests/test_connect_cmd.tcl index 386917527ac5..7f12190d8633 100644 --- a/pkg/cli/interactive_tests/test_connect_cmd.tcl +++ b/pkg/cli/interactive_tests/test_connect_cmd.tcl @@ -110,17 +110,31 @@ eexpect foo@ eexpect "/system>" end_test +start_test "Check that the client-side connect cmd can change users with automatic client cert detection" +send "\\c - root - - autocerts\r" +eexpect "using new connection URL" +eexpect root@ +eexpect "/system>" +end_test + +start_test "Check that the auto-cert feature properly fails if certs were not found" +send "\\c - unknownuser - - autocerts\r" +eexpect "unable to find TLS client cert and key" +eexpect root@ +eexpect "/system>" +end_test + start_test "Check that the client-side connect cmd can detect syntax errors" send "\\c - - - - abc\r" eexpect "unknown syntax" -eexpect foo@ +eexpect root@ eexpect "/system>" end_test start_test "Check that the client-side connect cmd recognizes invalid URLs" send "\\c postgres://root@localhost:26257/defaultdb?sslmode=invalid&sslcert=$certs_dir%2Fclient.root.crt&sslkey=$certs_dir%2Fclient.root.key&sslrootcert=$certs_dir%2Fca.crt\r" eexpect "unrecognized sslmode parameter" -eexpect foo@ +eexpect root@ eexpect "/system>" end_test diff --git a/pkg/cli/sql_shell_cmd.go b/pkg/cli/sql_shell_cmd.go index 5b8ae110a2df..764b0491fc44 100644 --- a/pkg/cli/sql_shell_cmd.go +++ b/pkg/cli/sql_shell_cmd.go @@ -59,6 +59,7 @@ func runTerm(cmd *cobra.Command, args []string) (resErr error) { } defer func() { resErr = errors.CombineErrors(resErr, conn.Close()) }() + sqlCtx.ShellCtx.CertsDir = baseCfg.SSLCertsDir sqlCtx.ShellCtx.ParseURL = clienturl.MakeURLParserFn(cmd, cliCtx.clientOpts) return sqlCtx.Run(context.Background(), conn) } diff --git a/pkg/cmd/cockroach-sql/main.go b/pkg/cmd/cockroach-sql/main.go index 206b71a3d7fb..61b7351652b1 100644 --- a/pkg/cmd/cockroach-sql/main.go +++ b/pkg/cmd/cockroach-sql/main.go @@ -191,6 +191,7 @@ func runSQL(cmd *cobra.Command, args []string) (resErr error) { } defer func() { resErr = errors.CombineErrors(resErr, conn.Close()) }() + cfg.ShellCtx.CertsDir = copts.CertsDir cfg.ShellCtx.ParseURL = clienturl.MakeURLParserFn(cmd, copts) return cfg.Run(context.Background(), conn) } From aa4ab6a0c5cb7ae7e90b5f5035626172b3885b20 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Thu, 6 Apr 2023 16:55:46 -0400 Subject: [PATCH 10/11] admission: move CreateTime-sequencing below-raft We move kvflowsequencer.Sequencer and its use in kvflowhandle.Handle (above-raft) to admission.sequencer, now used by admission.StoreWorkQueue (below-raft). This variant appeared in an earlier revision of #97599 where we first introduced monotonically increasing CreateTimes for a given raft group. In a subsequent commit, when integrating kvflowcontrol into the critical path for replication traffic, we'll observe that it's quite difficult to create sequencing CreateTimes[^1] above raft. This is because these sequence numbers are encoded as part of the raft proposal[^2], and at encode-time, we don't actually know what log position the proposal is going to end up in. It's hard to explicitly guarantee that a proposal with log-position P1 will get encoded before another with log position P2, where P1 < P2. Naively sequencing CreateTimes at proposal-encode-time could result in over-admission. This is because of how we return flow tokens -- up to some log index[^3], and how use these sequence numbers in below-raft WorkQueues. If P2 ends up with a lower sequence number/CreateTime, it would get admitted first, and when returning flow tokens by log position, in specifying up-to-P2, we'll early return P1's flow tokens despite it not being admitted. So we'd over-admit at the sender. This is all within a pair. [^1]: We use CreateTimes as "sequence numbers" in replication admission control. We want to assign each AC-queued work below-raft a "sequence number" for FIFO ordering within a . We ensure these timestamps are roughly monotonic with respect to log positions of replicated work by sequencing work in log position order. [^2]: In kvflowcontrolpb.RaftAdmissionMeta. [^3]: See kvflowcontrolpb.AdmittedRaftLogEntries. Release note: None --- pkg/BUILD.bazel | 4 -- pkg/kv/kvserver/kvflowcontrol/doc.go | 2 +- .../kvserver/kvflowcontrol/kvflowcontrol.go | 8 +-- .../kvflowcontroller/kvflowcontroller.go | 5 +- .../kvflowcontrol/kvflowhandle/BUILD.bazel | 2 - .../kvflowhandle/kvflowhandle.go | 23 +++---- .../kvflowhandle/kvflowhandle_test.go | 67 +------------------ .../kvflowcontrol/kvflowsequencer/BUILD.bazel | 29 -------- .../kvflowsimulator/simulation_test.go | 2 +- pkg/util/admission/BUILD.bazel | 2 + .../admission}/sequencer.go | 46 +++++++------ .../admission}/sequencer_test.go | 33 ++------- ...h_create_time_low_position_different_range | 2 +- .../high_create_time_low_position_same_range | 52 ++++++++++++++ .../tenant_fairness | 3 +- .../admission}/testdata/sequencer | 9 +-- pkg/util/admission/work_queue.go | 59 +++++++++++++++- 17 files changed, 166 insertions(+), 182 deletions(-) delete mode 100644 pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/BUILD.bazel rename pkg/{kv/kvserver/kvflowcontrol/kvflowsequencer => util/admission}/sequencer.go (52%) rename pkg/{kv/kvserver/kvflowcontrol/kvflowsequencer => util/admission}/sequencer_test.go (64%) create mode 100644 pkg/util/admission/testdata/replicated_write_admission/high_create_time_low_position_same_range rename pkg/{kv/kvserver/kvflowcontrol/kvflowsequencer => util/admission}/testdata/sequencer (91%) diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index a5ce75721742..27a0f33defc4 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -227,7 +227,6 @@ ALL_TESTS = [ "//pkg/kv/kvserver/kvflowcontrol/kvflowcontroller:kvflowcontroller_test", "//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch:kvflowdispatch_test", "//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:kvflowhandle_test", - "//pkg/kv/kvserver/kvflowcontrol/kvflowsequencer:kvflowsequencer_test", "//pkg/kv/kvserver/kvflowcontrol/kvflowsimulator:kvflowsimulator_test", "//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:kvflowtokentracker_test", "//pkg/kv/kvserver/kvstorage:kvstorage_test", @@ -1303,8 +1302,6 @@ GO_TARGETS = [ "//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch:kvflowdispatch_test", "//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:kvflowhandle", "//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:kvflowhandle_test", - "//pkg/kv/kvserver/kvflowcontrol/kvflowsequencer:kvflowsequencer", - "//pkg/kv/kvserver/kvflowcontrol/kvflowsequencer:kvflowsequencer_test", "//pkg/kv/kvserver/kvflowcontrol/kvflowsimulator:kvflowsimulator_test", "//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:kvflowtokentracker", "//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:kvflowtokentracker_test", @@ -2770,7 +2767,6 @@ GET_X_DATA_TARGETS = [ "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb:get_x_data", "//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch:get_x_data", "//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:get_x_data", - "//pkg/kv/kvserver/kvflowcontrol/kvflowsequencer:get_x_data", "//pkg/kv/kvserver/kvflowcontrol/kvflowsimulator:get_x_data", "//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:get_x_data", "//pkg/kv/kvserver/kvserverbase:get_x_data", diff --git a/pkg/kv/kvserver/kvflowcontrol/doc.go b/pkg/kv/kvserver/kvflowcontrol/doc.go index ec784c1e1cd7..49910b2790d5 100644 --- a/pkg/kv/kvserver/kvflowcontrol/doc.go +++ b/pkg/kv/kvserver/kvflowcontrol/doc.go @@ -483,7 +483,7 @@ package kvflowcontrol // still queued after ~100ms, will trigger epoch-LIFO everywhere. // [^11]: See the implementation for kvflowcontrol.Dispatch. // [^12]: See UpToRaftLogPosition in AdmittedRaftLogEntries. -// [^13]: See kvflowsequencer.Sequencer and its use in kvflowhandle.Handle. +// [^13]: See admission.sequencer and its use in admission.StoreWorkQueue. // [^14]: See the high_create_time_low_position_different_range test case for // TestReplicatedWriteAdmission. // diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go index e55cb2acc4c7..de6641205a58 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go @@ -98,13 +98,11 @@ type Handle interface { // work with given priority along connected streams. The deduction is // tracked with respect to the specific raft log position it's expecting it // to end up in, log positions that monotonically increase. Requests are - // assumed to have been Admit()-ed first. The returned time.Time parameter - // is to be used as the work item's CreateTime when enqueueing in IO - // admission queues. + // assumed to have been Admit()-ed first. DeductTokensFor( - context.Context, admissionpb.WorkPriority, time.Time, + context.Context, admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition, Tokens, - ) time.Time + ) // ReturnTokensUpto returns all previously deducted tokens of a given // priority for all log positions less than or equal to the one specified. // It does for the specific stream. Once returned, subsequent attempts to diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go index 46c8e6b0cfb5..a18472414715 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go @@ -183,8 +183,9 @@ func (c *Controller) Admit( } } - // TODO(irfansharif): Use the create time for ordering among waiting - // requests. Integrate it with epoch-LIFO. + // TODO(irfansharif): Use CreateTime for ordering among waiting + // requests, integrate it with epoch-LIFO. See I12 from + // kvflowcontrol/doc.go. } // DeductTokens is part of the kvflowcontrol.Controller interface. diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/BUILD.bazel b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/BUILD.bazel index ff07b3cc5958..0b4379c2a72b 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/BUILD.bazel +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/BUILD.bazel @@ -14,7 +14,6 @@ go_library( "//pkg/base", "//pkg/kv/kvserver/kvflowcontrol", "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", - "//pkg/kv/kvserver/kvflowcontrol/kvflowsequencer", "//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker", "//pkg/util/admission/admissionpb", "//pkg/util/hlc", @@ -40,7 +39,6 @@ go_test( "//pkg/util/leaktest", "//pkg/util/log", "//pkg/util/metric", - "//pkg/util/timeutil", "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go index 1c43ffeb2ab7..0ff5ebf18a10 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go @@ -17,7 +17,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -41,7 +40,6 @@ type Handle struct { // (identified by their log positions) have been admitted below-raft, // streams disconnect, or the handle closed entirely. perStreamTokenTracker map[kvflowcontrol.Stream]*kvflowtokentracker.Tracker - sequencer *kvflowsequencer.Sequencer closed bool } } @@ -54,7 +52,6 @@ func New(controller kvflowcontrol.Controller, metrics *Metrics, clock *hlc.Clock clock: clock, } h.mu.perStreamTokenTracker = map[kvflowcontrol.Stream]*kvflowtokentracker.Tracker{} - h.mu.sequencer = kvflowsequencer.New() return h } @@ -104,31 +101,28 @@ func (h *Handle) Admit(ctx context.Context, pri admissionpb.WorkPriority, ct tim func (h *Handle) DeductTokensFor( ctx context.Context, pri admissionpb.WorkPriority, - ct time.Time, pos kvflowcontrolpb.RaftLogPosition, tokens kvflowcontrol.Tokens, -) time.Time { +) { if h == nil { // TODO(irfansharif): See TODO around nil receiver check in Admit(). - return ct + return } - ct, _ = h.deductTokensForInner(ctx, pri, ct, pos, tokens) - return ct + _ = h.deductTokensForInner(ctx, pri, pos, tokens) } func (h *Handle) deductTokensForInner( ctx context.Context, pri admissionpb.WorkPriority, - ct time.Time, pos kvflowcontrolpb.RaftLogPosition, tokens kvflowcontrol.Tokens, -) (sequence time.Time, streams []kvflowcontrol.Stream) { +) (streams []kvflowcontrol.Stream) { h.mu.Lock() defer h.mu.Unlock() if h.mu.closed { log.Errorf(ctx, "operating on a closed handle") - return ct, nil // unused return value in production code + return nil // unused return value in production code } for _, c := range h.mu.connections { @@ -136,7 +130,7 @@ func (h *Handle) deductTokensForInner( h.mu.perStreamTokenTracker[c.Stream()].Track(ctx, pri, tokens, pos) streams = append(streams, c.Stream()) } - return h.mu.sequencer.Sequence(ct), streams + return streams } // ReturnTokensUpto is part of the kvflowcontrol.Handle interface. @@ -322,9 +316,8 @@ func (h *Handle) TestingNonBlockingAdmit( func (h *Handle) TestingDeductTokensForInner( ctx context.Context, pri admissionpb.WorkPriority, - ct time.Time, pos kvflowcontrolpb.RaftLogPosition, tokens kvflowcontrol.Tokens, -) (time.Time, []kvflowcontrol.Stream) { - return h.deductTokensForInner(ctx, pri, ct, pos, tokens) +) []kvflowcontrol.Stream { + return h.deductTokensForInner(ctx, pri, pos, tokens) } diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle_test.go b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle_test.go index 72f5d7e05724..a4f1182ac19f 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle_test.go @@ -26,7 +26,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" - "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/stretchr/testify/require" ) @@ -80,7 +79,7 @@ func TestHandleAdmit(t *testing.T) { // Connect a single stream at pos=0 and deplete all 16MiB of regular // tokens at pos=1. handle.ConnectStream(ctx, pos(0), stream) - handle.DeductTokensFor(ctx, admissionpb.NormalPri, time.Time{}, pos(1), kvflowcontrol.Tokens(16<<20 /* 16MiB */)) + handle.DeductTokensFor(ctx, admissionpb.NormalPri, pos(1), kvflowcontrol.Tokens(16<<20 /* 16MiB */)) // Invoke .Admit() in a separate goroutine, and test below whether // the goroutine is blocked. @@ -106,67 +105,3 @@ func TestHandleAdmit(t *testing.T) { }) } } - -// TestHandleSequencing tests the sequencing behavior of -// Handle.DeductTokensFor(), namely that we: -// - advance sequencing timestamps when the create-time advances; -// - advance sequencing timestamps when the log position advances. -func TestHandleSequencing(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - // tzero represents the t=0, the earliest possible time. All other - // create-time= is relative to this time. - var tzero = timeutil.Unix(0, 0) - - ctx := context.Background() - stream := kvflowcontrol.Stream{ - TenantID: roachpb.MustMakeTenantID(42), - StoreID: roachpb.StoreID(42), - } - pos := func(t, i uint64) kvflowcontrolpb.RaftLogPosition { - return kvflowcontrolpb.RaftLogPosition{Term: t, Index: i} - } - ct := func(d int64) time.Time { - return tzero.Add(time.Nanosecond * time.Duration(d)) - } - - const tokens = kvflowcontrol.Tokens(1 << 20 /* MiB */) - const normal = admissionpb.NormalPri - - registry := metric.NewRegistry() - clock := hlc.NewClockForTesting(nil) - controller := kvflowcontroller.New(registry, cluster.MakeTestingClusterSettings(), clock) - handle := kvflowhandle.New(controller, kvflowhandle.NewMetrics(registry), clock) - - // Test setup: handle is connected to a single stream at pos=1/0 and has - // deducted 1MiB of regular tokens at pos=1 ct=1. - handle.ConnectStream(ctx, pos(1, 0), stream) - seq0 := handle.DeductTokensFor(ctx, normal, ct(1), pos(1, 1), tokens) - - // If create-time advances, so does the sequencing timestamp. - seq1 := handle.DeductTokensFor(ctx, normal, ct(2), pos(1, 1), tokens) - require.Greater(t, seq1, seq0) - - // If stays static, the sequencing timestamp - // still advances. - seq2 := handle.DeductTokensFor(ctx, normal, ct(2), pos(1, 1), tokens) - require.Greater(t, seq2, seq1) - - // If the log index advances, so does the sequencing timestamp. - seq3 := handle.DeductTokensFor(ctx, normal, ct(3), pos(1, 2), tokens) - require.Greater(t, seq3, seq2) - - // If the log term advances, so does the sequencing timestamp. - seq4 := handle.DeductTokensFor(ctx, normal, ct(3), pos(2, 2), tokens) - require.Greater(t, seq4, seq3) - - // If both the create-time and log-position advance, so does the sequencing - // timestamp. - seq5 := handle.DeductTokensFor(ctx, normal, ct(1000), pos(4, 20), tokens) - require.Greater(t, seq5, seq4) - - // Verify that the sequencing timestamp is kept close to the maximum - // observed create-time. - require.LessOrEqual(t, seq5.Sub(ct(1000)), time.Nanosecond) -} diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/BUILD.bazel b/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/BUILD.bazel deleted file mode 100644 index 4b7b66091b3e..000000000000 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/BUILD.bazel +++ /dev/null @@ -1,29 +0,0 @@ -load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") -load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") - -go_library( - name = "kvflowsequencer", - srcs = ["sequencer.go"], - importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer", - visibility = ["//visibility:public"], - deps = ["//pkg/util/timeutil"], -) - -go_test( - name = "kvflowsequencer_test", - srcs = ["sequencer_test.go"], - args = ["-test.timeout=295s"], - data = glob(["testdata/**"]), - embed = [":kvflowsequencer"], - deps = [ - "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", - "//pkg/testutils/datapathutils", - "//pkg/util/leaktest", - "//pkg/util/log", - "//pkg/util/timeutil", - "@com_github_cockroachdb_datadriven//:datadriven", - "@com_github_stretchr_testify//require", - ], -) - -get_x_data(name = "get_x_data") diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowsimulator/simulation_test.go b/pkg/kv/kvserver/kvflowcontrol/kvflowsimulator/simulation_test.go index 6cdc943cfc32..d898c39ce43a 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowsimulator/simulation_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowsimulator/simulation_test.go @@ -630,7 +630,7 @@ func (h *replicaHandle) deductTokens( // Increment the quorum log position -- all token deductions are bound to // incrementing log positions. h.quorumLogPosition.Index += 1 - _, streams := h.handle.TestingDeductTokensForInner(ctx, pri, time.Time{}, h.quorumLogPosition, tokens) + streams := h.handle.TestingDeductTokensForInner(ctx, pri, h.quorumLogPosition, tokens) for _, stream := range streams { h.deductionTracker[stream].Track(ctx, pri, tokens, h.quorumLogPosition) } diff --git a/pkg/util/admission/BUILD.bazel b/pkg/util/admission/BUILD.bazel index 3ed968ba72fa..e2dbecf8603b 100644 --- a/pkg/util/admission/BUILD.bazel +++ b/pkg/util/admission/BUILD.bazel @@ -15,6 +15,7 @@ go_library( "kv_slot_adjuster.go", "pacer.go", "scheduler_latency_listener.go", + "sequencer.go", "sql_cpu_overload_indicator.go", "store_token_estimation.go", "testing_knobs.go", @@ -56,6 +57,7 @@ go_test( "io_load_listener_test.go", "replicated_write_admission_test.go", "scheduler_latency_listener_test.go", + "sequencer_test.go", "store_token_estimation_test.go", "tokens_linear_model_test.go", "work_queue_test.go", diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/sequencer.go b/pkg/util/admission/sequencer.go similarity index 52% rename from pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/sequencer.go rename to pkg/util/admission/sequencer.go index 9cc0271f4257..05b389cc9310 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/sequencer.go +++ b/pkg/util/admission/sequencer.go @@ -8,15 +8,9 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package kvflowsequencer +package admission -import ( - "time" - - "github.com/cockroachdb/cockroach/pkg/util/timeutil" -) - -// Sequencer issues monotonic sequencing timestamps derived from observed +// sequencer issues monotonic sequencing timestamps derived from observed // CreateTimes. This is a purpose-built data structure for replication admission // control where we want to assign each AC-queued work below-raft a "sequence // number" for FIFO ordering within a . We ensure timestamps @@ -27,9 +21,29 @@ import ( // // It's not safe for concurrent access. // +// ---- +// +// Aside: Why not do this CreateTime-generation above raft? This is because these +// sequence numbers are encoded as part of the raft proposal[3], and at +// encode-time, we don't actually know what log position the proposal is going +// to end up in. It's hard to explicitly guarantee that a proposal with +// log-position P1 will get encoded before another with log position P2, where +// P1 < P2. +// +// If we tried to "approximate" CreateTimes at proposal-encode-time, +// approximating log position order, it could result in over-admission. This is +// because of how we return flow tokens -- up to some log index[4], and how use +// these sequence numbers in below-raft WorkQueues. If P2 ends up with a lower +// sequence number/CreateTime, it would get admitted first, and when returning +// flow tokens by log position, in specifying up-to-P2, we'll early return P1's +// flow tokens despite it not being admitted. So we'd over-admit at the sender. +// This is all within a pair. +// // [1]: See I12 from kvflowcontrol/doc.go. -// [2]: See kvflowhandle.Handle. -type Sequencer struct { +// [2]: See kvadmission.AdmitRaftEntry. +// [3]: In kvflowcontrolpb.RaftAdmissionMeta. +// [4]: See kvflowcontrolpb.AdmittedRaftLogEntries. +type sequencer struct { // maxCreateTime ratchets to the highest observed CreateTime. If sequencing // work with lower CreateTimes, we continue generating monotonic sequence // numbers by incrementing it for every such sequencing attempt. Provided @@ -38,18 +52,12 @@ type Sequencer struct { maxCreateTime int64 } -// New returns a new Sequencer. -func New() *Sequencer { - return &Sequencer{} -} - -// Sequence returns a monotonically increasing timestamps derived from the +// sequence returns a monotonically increasing timestamps derived from the // provided CreateTime. -func (s *Sequencer) Sequence(ct time.Time) time.Time { - createTime := ct.UnixNano() +func (s *sequencer) sequence(createTime int64) int64 { if createTime <= s.maxCreateTime { createTime = s.maxCreateTime + 1 } s.maxCreateTime = createTime - return timeutil.FromUnixNanos(createTime) + return createTime } diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/sequencer_test.go b/pkg/util/admission/sequencer_test.go similarity index 64% rename from pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/sequencer_test.go rename to pkg/util/admission/sequencer_test.go index cc97d1a86047..968c47ae2629 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/sequencer_test.go +++ b/pkg/util/admission/sequencer_test.go @@ -8,16 +8,13 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package kvflowsequencer +package admission import ( "fmt" - "strconv" - "strings" "testing" "time" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb" "github.com/cockroachdb/cockroach/pkg/testutils/datapathutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -30,13 +27,13 @@ func TestSequencer(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - var sequencer *Sequencer + var seq *sequencer var lastSeqNum int64 datadriven.RunTest(t, datapathutils.TestDataPath(t, "sequencer"), func(t *testing.T, d *datadriven.TestData) string { switch d.Cmd { case "init": - sequencer = New() + seq = &sequencer{} return "" case "sequence": @@ -49,8 +46,8 @@ func TestSequencer(t *testing.T) { // Parse log-position=/. logPosition := parseLogPosition(t, d) - _ = logPosition - sequenceNum := sequencer.Sequence(tzero.Add(dur)).UnixNano() + _ = logPosition // unused + sequenceNum := seq.sequence(tzero.Add(dur).UnixNano()) if lastSeqNum < sequenceNum { movement = " (advanced)" } @@ -67,23 +64,3 @@ func TestSequencer(t *testing.T) { }, ) } - -// tzero represents the t=0, the earliest possible time. All other -// create-time= is relative to this time. -var tzero = timeutil.Unix(0, 0) - -func parseLogPosition(t *testing.T, d *datadriven.TestData) kvflowcontrolpb.RaftLogPosition { - // Parse log-position=/. - var arg string - d.ScanArgs(t, "log-position", &arg) - inner := strings.Split(arg, "/") - require.Len(t, inner, 2) - term, err := strconv.Atoi(inner[0]) - require.NoError(t, err) - index, err := strconv.Atoi(inner[1]) - require.NoError(t, err) - return kvflowcontrolpb.RaftLogPosition{ - Term: uint64(term), - Index: uint64(index), - } -} diff --git a/pkg/util/admission/testdata/replicated_write_admission/high_create_time_low_position_different_range b/pkg/util/admission/testdata/replicated_write_admission/high_create_time_low_position_different_range index d36a0455822e..d7a28c2ff8bc 100644 --- a/pkg/util/admission/testdata/replicated_write_admission/high_create_time_low_position_different_range +++ b/pkg/util/admission/testdata/replicated_write_admission/high_create_time_low_position_different_range @@ -19,7 +19,7 @@ admit tenant=t1 pri=normal-pri create-time=1us size=1B range=r2 origin=n1 log-po # request with the lower create time sorts first despite having the higher log # position. Admission work queues order work based entirely on create-times, # and the assignment of monotonic create-times (WRT log positions) happens only -# within a range and by higher-level components -- kvflowcontrol.Handle. +# within a range by the StoreWorkQueue. print ---- physical-stats: work-count=2 written-bytes=2B ingested-bytes=0B diff --git a/pkg/util/admission/testdata/replicated_write_admission/high_create_time_low_position_same_range b/pkg/util/admission/testdata/replicated_write_admission/high_create_time_low_position_same_range new file mode 100644 index 000000000000..3de81d4eb0b4 --- /dev/null +++ b/pkg/util/admission/testdata/replicated_write_admission/high_create_time_low_position_same_range @@ -0,0 +1,52 @@ +# Verify that we ignore create-time based ordering for replicated write +# admission when writes happen within the same range. + +init +---- +[regular] 0B tokens available +[elastic] 0B tokens available + +# Admit two requests, one created at t=5us but with a lower log position. +admit tenant=t1 pri=normal-pri create-time=5us size=1B range=r1 origin=n1 log-position=4/20 +---- +[regular] try-get=1B available=0B => insufficient tokens + +# And one created at t=1us but but higher log position. +admit tenant=t1 pri=normal-pri create-time=1us size=1B range=r1 origin=n1 log-position=4/21 +---- + +# Observe both waiting requests and physical admission stats. Note how the +# request with the lower log position sorts first despite having the higher +# create-time. The StoreWorkQueue sequences them by (ab)using the create-time +# parameter to get this log position ordering. +print +---- +physical-stats: work-count=2 written-bytes=2B ingested-bytes=0B +[regular work queue]: len(tenant-heap)=1 top-tenant=t1 + tenant=t1 weight=1 fifo-threshold=low-pri used=0B + [0: pri=normal-pri create-time=5µs size=1B range=r1 origin=n1 log-position=4/20] + [1: pri=normal-pri create-time=5.001µs size=1B range=r1 origin=n1 log-position=4/21] +[elastic work queue]: len(tenant-heap)=0 + +# Produce 1B worth of regular tokens. +granter class=regular adjust-tokens=+1B +---- +[regular] 1B tokens available +[elastic] 0B tokens available + +# Grant admission to requests. Since we have 1B worth of tokens, and 2 waiting +# requests wanting 1B each, we're only able to admit one. Verify that it's the +# request with the lower log position despite the higher original create-time. +grant class=regular +---- +admitted [tenant=t1 pri=normal-pri create-time=5µs size=1B range=r1 origin=n1 log-position=4/20] + +print +---- +physical-stats: work-count=2 written-bytes=2B ingested-bytes=0B +[regular work queue]: len(tenant-heap)=1 top-tenant=t1 + tenant=t1 weight=1 fifo-threshold=low-pri used=1B + [0: pri=normal-pri create-time=5.001µs size=1B range=r1 origin=n1 log-position=4/21] +[elastic work queue]: len(tenant-heap)=0 + +# vim:ft=sh diff --git a/pkg/util/admission/testdata/replicated_write_admission/tenant_fairness b/pkg/util/admission/testdata/replicated_write_admission/tenant_fairness index c00240b461f7..3f0d04dbd18a 100644 --- a/pkg/util/admission/testdata/replicated_write_admission/tenant_fairness +++ b/pkg/util/admission/testdata/replicated_write_admission/tenant_fairness @@ -7,8 +7,7 @@ init # For two tenants t1 and t2, try to admit two requests of 1B each at # incrementing log positions. We specify create-times in log-position order for -# work within a given range, similar to what we do at the issuing client -# (kvflowcontrol.Handle). +# work within a given range, similar to what we do at the StoreWorkQueue level. admit tenant=t1 pri=normal-pri create-time=1.001us size=1B range=r1 origin=n1 log-position=4/20 ---- [regular] try-get=1B available=0B => insufficient tokens diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/testdata/sequencer b/pkg/util/admission/testdata/sequencer similarity index 91% rename from pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/testdata/sequencer rename to pkg/util/admission/testdata/sequencer index ea335538f940..97716928fec2 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/testdata/sequencer +++ b/pkg/util/admission/testdata/sequencer @@ -1,7 +1,8 @@ -# Walk through the basics of how the per-handle sequencer works. The -# log-position= parameter is not actually used in the implementation, but in -# typical usage we'd be sequencing work in log position order, and it's -# instructive to understand how sequencing timestamps are generated. +# Walk through the basics of how the below-raft replicated work sequencer +# works. The log-position= parameter is not actually used in the +# implementation, but in typical usage we'd be sequencing work in log position +# order, and it's instructive to understand how sequencing timestamps are +# generated. # # ----------------------------------------------------------------------------- # 1. Observe how the sequence numbers change with changing log positions (and diff --git a/pkg/util/admission/work_queue.go b/pkg/util/admission/work_queue.go index 8a97cfa900ca..1284f8bb7b1e 100644 --- a/pkg/util/admission/work_queue.go +++ b/pkg/util/admission/work_queue.go @@ -156,6 +156,14 @@ var epochLIFOQueueDelayThresholdToSwitchToLIFO = settings.RegisterDurationSettin return nil }).WithPublic() +var rangeSequencerGCThreshold = settings.RegisterDurationSetting( + settings.TenantWritable, + "admission.replication_control.range_sequencer_gc_threshold", + "the inactive duration for a range sequencer after it's garbage collected", + 5*time.Minute, + settings.NonNegativeDuration, +) + // WorkInfo provides information that is used to order work within an WorkQueue. // The WorkKind is not included as a field since an WorkQueue deals with a // single WorkKind. @@ -1773,9 +1781,13 @@ type StoreWorkQueue struct { // and observed L0 growth (which factors in state machine application). stats storeAdmissionStats } - stopCh chan struct{} - timeSource timeutil.TimeSource - settings *cluster.Settings + sequencersMu struct { + syncutil.Mutex + s map[roachpb.RangeID]*sequencer // cleaned up periodically + } + stopCh chan struct{} + timeSource timeutil.TimeSource + settings *cluster.Settings knobs *TestingKnobs } @@ -1824,6 +1836,9 @@ func (q *StoreWorkQueue) Admit( info.RequestedCount = q.mu.estimates.writeTokens q.mu.RUnlock() } + if info.ReplicatedWorkInfo.Enabled { + info.CreateTime = q.sequenceReplicatedWork(info.CreateTime, info.ReplicatedWorkInfo) + } enabled, err := q.q[wc].Admit(ctx, info.WorkInfo) if err != nil { @@ -2042,5 +2057,43 @@ func makeStoreWorkQueue( q.mu.estimates = storeRequestEstimates{ writeTokens: 1, } + + q.sequencersMu.s = make(map[roachpb.RangeID]*sequencer) + go func() { + ticker := time.NewTicker(30 * time.Second) + for { + select { + case <-ticker.C: + q.gcSequencers() + case <-q.stopCh: + return + } + } + }() return q } + +func (q *StoreWorkQueue) gcSequencers() { + q.sequencersMu.Lock() + defer q.sequencersMu.Unlock() + + for rangeID, seq := range q.sequencersMu.s { + maxCreateTime := timeutil.FromUnixNanos(seq.maxCreateTime) + if q.timeSource.Now().Sub(maxCreateTime) > rangeSequencerGCThreshold.Get(&q.settings.SV) { + delete(q.sequencersMu.s, rangeID) + } + } +} + +func (q *StoreWorkQueue) sequenceReplicatedWork(createTime int64, info ReplicatedWorkInfo) int64 { + q.sequencersMu.Lock() + seq, ok := q.sequencersMu.s[info.RangeID] + if !ok { + seq = &sequencer{} + q.sequencersMu.s[info.RangeID] = seq + } + q.sequencersMu.Unlock() + // We're assuming sequenceReplicatedWork is never invoked concurrently for a + // given RangeID. + return seq.sequence(createTime) +} From 05c6ae3da6fb56ae2c12b6c807d3f2a7b802504f Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Thu, 6 Apr 2023 17:49:32 -0400 Subject: [PATCH 11/11] admission: add intercept points for when replicated work gets admitted In a subsequent commit, when integrating kvflowcontrol into the critical path for replication traffic, we'll set up the return of flow tokens from the receiver node back to the sender once log entries get (asynchronously) admitted[^1]. So we need to intercept the exact points at which the virtually enqueued work items get admitted, since it all happens asynchronously[^2]. To that end we introduce the following interface: // OnLogEntryAdmitted is used to observe the specific entries // (identified by rangeID + log position) that were admitted. Since // admission control for log entries is asynchronous/non-blocking, // this allows callers to do requisite post-admission // bookkeeping. type OnLogEntryAdmitted interface { AdmittedLogEntry( origin roachpb.NodeID, /* node where the entry originated */ pri admissionpb.WorkPriority, /* admission priority of the entry */ storeID roachpb.StoreID, /* store on which the entry was admitted */ rangeID roachpb.RangeID, /* identifying range for the log entry */ pos LogPosition, /* log position of the entry that was admitted*/ ) } For now we pass in a no-op implementation in production code, but this will change shortly. Seeing as how the asynchronous admit interface is going to be the primary once once we enable replication admission control by default, for IO control, we no longer need the storeWriteDone interfaces and corresponding types. It's being used by our current (and soon-to-be legacy) above-raft IO admission control to inform granters of when the write was actually done, post-admission. For above-raft IO control, at admit-time we do not have sizing info for the writes, so by intercepting these writes at write-done time we're able to make any outstanding token adjustments in the granter. To reflect this new world, we: - Rename setAdmittedDoneModels to setLinearModels. - Introduce a storeReplicatedWorkAdmittedInfo[^3]. It provides information about the size of replicated work once it's admitted (which happens asynchronously from the work itself). This lets us use the underlying linear models for L0 {writes,ingests} to deduct an appropriate number of tokens from the granter, for the admitted work size[^4]. - Rename the granterWithStoreWriteDone interface to granterWithStoreReplicatedWorkAdmitted. We'll still intercept the actual point of admission for some token adjustments, through the the storeReplicatedWorkAdmittedLocked API shown below. There are two callstacks through which this API gets invoked, one where the coord.mu is already held, and one where it isn't. We plumb this information through so the lock is acquired if not already held. The locking structure is unfortunate, but this was a minimally invasive diff. storeReplicatedWorkAdmittedLocked( originalTokens int64, admittedInfo storeReplicatedWorkAdmittedInfo, ) (additionalTokens int64) While here, we also export an admission.TestingReverseWorkPriorityDict. There are at least three tests that have re-invented the wheel. [^1]: This will happen through the kvflowcontrol.Dispatch interface introduced back in #97766, after integrating it with the RaftTransport layer. [^2]: Introduced in #97599, for replicated write work. [^3]: Identical to the previous StoreWorkDoneInfo. [^4]: There's a peculiarity here in that at enqueuing-time we actually know the size of the write, so we could have deducted the right number of tokens upfront and avoid this post-admit granter token adjustment. We inherit this structure from earlier, and just leave a TODO for now. Release note: None --- .../kvflowdispatch/kvflowdispatch_test.go | 7 +- .../kvflowtokentracker/tracker_test.go | 15 +-- pkg/server/server.go | 2 +- pkg/util/admission/admission.go | 40 ++++--- pkg/util/admission/admissionpb/admissionpb.go | 13 +- pkg/util/admission/elastic_cpu_work_handle.go | 5 +- pkg/util/admission/grant_coordinator.go | 28 ++++- pkg/util/admission/granter.go | 47 ++++---- pkg/util/admission/granter_test.go | 16 +-- pkg/util/admission/io_load_listener.go | 6 +- pkg/util/admission/io_load_listener_test.go | 4 +- .../replicated_write_admission_test.go | 26 ++-- pkg/util/admission/store_token_estimation.go | 16 ++- .../admission/store_token_estimation_test.go | 2 +- pkg/util/admission/work_queue.go | 111 ++++++++++++++---- pkg/util/admission/work_queue_test.go | 18 ++- 16 files changed, 233 insertions(+), 123 deletions(-) diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch_test.go b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch_test.go index 5caf3f67343a..6610f902ba15 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch_test.go @@ -31,11 +31,6 @@ func TestDispatch(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - reverseWorkPriorityDict := make(map[string]admissionpb.WorkPriority) - for k, v := range admissionpb.WorkPriorityDict { - reverseWorkPriorityDict[v] = k - } - datadriven.Walk(t, datapathutils.TestDataPath(t), func(t *testing.T, path string) { var dispatch *Dispatch datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { @@ -82,7 +77,7 @@ func TestDispatch(t *testing.T) { case strings.HasPrefix(parts[i], "pri="): // Parse pri=. - pri, found := reverseWorkPriorityDict[arg] + pri, found := admissionpb.TestingReverseWorkPriorityDict[arg] require.True(t, found) entries.AdmissionPriority = int32(pri) diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker_test.go b/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker_test.go index 858bda480797..7793501e0b75 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker_test.go @@ -32,19 +32,10 @@ func TestTracker(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - reverseWorkPriorityDict := make(map[string]admissionpb.WorkPriority) - for k, v := range admissionpb.WorkPriorityDict { - reverseWorkPriorityDict[v] = k - } - ctx := context.Background() datadriven.Walk(t, datapathutils.TestDataPath(t), func(t *testing.T, path string) { var tracker *Tracker - knobs := &kvflowcontrol.TestingKnobs{ - UntrackTokensInterceptor: func(tokens kvflowcontrol.Tokens, pos kvflowcontrolpb.RaftLogPosition) { - - }, - } + knobs := &kvflowcontrol.TestingKnobs{} datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { switch d.Cmd { case "init": @@ -73,7 +64,7 @@ func TestTracker(t *testing.T) { switch { case strings.HasPrefix(parts[i], "pri="): var found bool - pri, found = reverseWorkPriorityDict[arg] + pri, found = admissionpb.TestingReverseWorkPriorityDict[arg] require.True(t, found) case strings.HasPrefix(parts[i], "tokens="): @@ -103,7 +94,7 @@ func TestTracker(t *testing.T) { var priStr, logPositionStr string d.ScanArgs(t, "pri", &priStr) d.ScanArgs(t, "up-to-log-position", &logPositionStr) - pri, found := reverseWorkPriorityDict[priStr] + pri, found := admissionpb.TestingReverseWorkPriorityDict[priStr] require.True(t, found) logPosition := parseLogPosition(t, logPositionStr) diff --git a/pkg/server/server.go b/pkg/server/server.go index c4f6a4f8469b..d422e07be84f 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -269,7 +269,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { if opts, ok := cfg.TestingKnobs.AdmissionControl.(*admission.Options); ok { admissionOptions.Override(opts) } - gcoords := admission.NewGrantCoordinators(cfg.AmbientCtx, st, admissionOptions, registry) + gcoords := admission.NewGrantCoordinators(cfg.AmbientCtx, st, admissionOptions, registry, &admission.NoopOnLogEntryAdmitted{}) engines, err := cfg.CreateEngines(ctx) if err != nil { diff --git a/pkg/util/admission/admission.go b/pkg/util/admission/admission.go index e59d16927952..5a79d6d358c8 100644 --- a/pkg/util/admission/admission.go +++ b/pkg/util/admission/admission.go @@ -178,7 +178,7 @@ type granter interface { // is a possibility that that raced with cancellation. // // Do not use this for doing store IO-related token adjustments when work is - // done -- that should be done via granterWithStoreWriteDone.storeWriteDone. + // done -- that should be done via granterWithStoreReplicatedWorkAdmitted.storeWriteDone. // // REQUIRES: count > 0. count == 1 for slots. returnGrant(count int64) @@ -195,7 +195,7 @@ type granter interface { // work turned out to be an underestimate. // // Do not use this for doing store IO-related token adjustments when work is - // done -- that should be done via granterWithStoreWriteDone.storeWriteDone. + // done -- that should be done via granterWithStoreReplicatedWorkAdmitted.storeWriteDone. // // REQUIRES: count > 0. count == 1 for slots. tookWithoutPermission(count int64) @@ -274,23 +274,33 @@ type granterWithIOTokens interface { // getDiskTokensUsedAndReset returns the disk bandwidth tokens used // since the last such call. getDiskTokensUsedAndReset() [admissionpb.NumWorkClasses]int64 - // setAdmittedDoneModelsLocked supplies the models to use when - // storeWriteDone is called, to adjust token consumption. Note that these - // models are not used for token adjustment at admission time -- that is - // handled by StoreWorkQueue and is not in scope of this granter. This - // asymmetry is due to the need to use all the functionality of WorkQueue at - // admission time. See the long explanatory comment at the beginning of - // store_token_estimation.go, regarding token estimation. - setAdmittedDoneModels(l0WriteLM tokensLinearModel, l0IngestLM tokensLinearModel, - ingestLM tokensLinearModel) + // setLinearModels supplies the models to use when storeWriteDone or + // storeReplicatedWorkAdmittedLocked is called, to adjust token consumption. + // Note that these models are not used for token adjustment at admission + // time -- that is handled by StoreWorkQueue and is not in scope of this + // granter. This asymmetry is due to the need to use all the functionality + // of WorkQueue at admission time. See the long explanatory comment at the + // beginning of store_token_estimation.go, regarding token estimation. + setLinearModels(l0WriteLM, l0IngestLM, ingestLM tokensLinearModel) } -// granterWithStoreWriteDone is used to abstract kvStoreTokenGranter for -// testing. The interface is used by StoreWorkQueue to pass on sizing -// information provided when the work was completed. -type granterWithStoreWriteDone interface { +// granterWithStoreReplicatedWorkAdmitted is used to abstract +// kvStoreTokenGranter for testing. The interface is used by StoreWorkQueue to +// pass on sizing information provided when the work is either done (for legacy, +// above-raft IO admission) or admitted (for below-raft, asynchronous admission +// control. +type granterWithStoreReplicatedWorkAdmitted interface { granter + // storeWriteDone is used by legacy, above-raft IO admission control to + // inform granters of when the write was actually done, post-admission. At + // admit-time we did not have sizing info for these writes, so by + // intercepting these writes at admit time we're able to make any + // outstanding token adjustments in the granter. storeWriteDone(originalTokens int64, doneInfo StoreWorkDoneInfo) (additionalTokens int64) + // storeReplicatedWorkAdmittedLocked is used by below-raft admission control + // to inform granters of work being admitted in order for them to make any + // outstanding token adjustments. It's invoked with the coord.mu held. + storeReplicatedWorkAdmittedLocked(originalTokens int64, admittedInfo storeReplicatedWorkAdmittedInfo) (additionalTokens int64) } // cpuOverloadIndicator is meant to be an instantaneous indicator of cpu diff --git a/pkg/util/admission/admissionpb/admissionpb.go b/pkg/util/admission/admissionpb/admissionpb.go index 57dc9911080f..c230a5d11c38 100644 --- a/pkg/util/admission/admissionpb/admissionpb.go +++ b/pkg/util/admission/admissionpb/admissionpb.go @@ -53,7 +53,7 @@ func (w WorkPriority) SafeFormat(p redact.SafePrinter, verb rune) { p.Print(s) return } - p.Printf("custom-pri=%d", w) + p.Printf("custom-pri=%d", int8(w)) } // WorkPriorityDict is a mapping of the priorities to a short string name. The @@ -69,6 +69,17 @@ var WorkPriorityDict = map[WorkPriority]string{ HighPri: "high-pri", } +// TestingReverseWorkPriorityDict is the reverse-lookup dictionary for +// WorkPriorityDict, for use in tests. +var TestingReverseWorkPriorityDict map[string]WorkPriority + +func init() { + TestingReverseWorkPriorityDict = make(map[string]WorkPriority) + for k, v := range WorkPriorityDict { + TestingReverseWorkPriorityDict[v] = k + } +} + // WorkClass represents the class of work, which is defined entirely by its // WorkPriority. Namely, everything less than NormalPri is defined to be // "Elastic", while everything above and including NormalPri is considered diff --git a/pkg/util/admission/elastic_cpu_work_handle.go b/pkg/util/admission/elastic_cpu_work_handle.go index 85c5561304b5..42e60594419c 100644 --- a/pkg/util/admission/elastic_cpu_work_handle.go +++ b/pkg/util/admission/elastic_cpu_work_handle.go @@ -151,9 +151,8 @@ func TestingNewElasticCPUHandle() *ElasticCPUWorkHandle { return newElasticCPUWorkHandle(420 * time.Hour) // use a very high allotment } -// TestingNewElasticCPUWithCallback constructs an -// ElascticCPUWorkHandle with a testing override for the behaviour of -// OverLimit(). +// TestingNewElasticCPUHandleWithCallback constructs an ElasticCPUWorkHandle +// with a testing override for the behaviour of OverLimit(). func TestingNewElasticCPUHandleWithCallback(cb func() (bool, time.Duration)) *ElasticCPUWorkHandle { h := TestingNewElasticCPUHandle() h.testingOverrideOverLimit = cb diff --git a/pkg/util/admission/grant_coordinator.go b/pkg/util/admission/grant_coordinator.go index edf2b24b28e9..582bc5d6c7d0 100644 --- a/pkg/util/admission/grant_coordinator.go +++ b/pkg/util/admission/grant_coordinator.go @@ -61,6 +61,7 @@ type StoreGrantCoordinators struct { // api. numStores int pebbleMetricsProvider PebbleMetricsProvider + onLogEntryAdmitted OnLogEntryAdmitted closeCh chan struct{} disableTickerForTesting bool @@ -157,7 +158,7 @@ func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID roachpb.StoreID) // This is IO work, so override the usesTokens value. opts.usesTokens = true // TODO(sumeer): add per-store WorkQueue state for debug.zip and db console. - granters := [admissionpb.NumWorkClasses]granterWithStoreWriteDone{ + granters := [admissionpb.NumWorkClasses]granterWithStoreReplicatedWorkAdmitted{ &kvStoreTokenChildGranter{ workClass: admissionpb.RegularWorkClass, parent: kvg, @@ -168,7 +169,17 @@ func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID roachpb.StoreID) }, } - storeReq := sgc.makeStoreRequesterFunc(sgc.ambientCtx, storeID, granters, sgc.settings, sgc.workQueueMetrics, opts, nil) + storeReq := sgc.makeStoreRequesterFunc( + sgc.ambientCtx, + storeID, + granters, + sgc.settings, + sgc.workQueueMetrics, + opts, + nil, /* knobs */ + sgc.onLogEntryAdmitted, + &coord.mu.Mutex, + ) coord.queues[KVWork] = storeReq requesters := storeReq.getRequesters() kvg.regularRequester = requesters[admissionpb.RegularWorkClass] @@ -336,8 +347,9 @@ type makeRequesterFunc func( metrics *WorkQueueMetrics, opts workQueueOptions) requester type makeStoreRequesterFunc func( - _ log.AmbientContext, storeID roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone, + _ log.AmbientContext, storeID roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreReplicatedWorkAdmitted, settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions, knobs *TestingKnobs, + onLogEntryAdmitted OnLogEntryAdmitted, coordMu *syncutil.Mutex, ) storeRequester // NewGrantCoordinators constructs GrantCoordinators and WorkQueues for a @@ -356,13 +368,17 @@ type makeStoreRequesterFunc func( // GrantCoordinators since they are not trying to control CPU usage, so we turn // off grant chaining in those coordinators. func NewGrantCoordinators( - ambientCtx log.AmbientContext, st *cluster.Settings, opts Options, registry *metric.Registry, + ambientCtx log.AmbientContext, + st *cluster.Settings, + opts Options, + registry *metric.Registry, + onLogEntryAdmitted OnLogEntryAdmitted, ) GrantCoordinators { metrics := makeGrantCoordinatorMetrics() registry.AddMetricStruct(metrics) return GrantCoordinators{ - Stores: makeStoresGrantCoordinators(ambientCtx, opts, st, metrics, registry), + Stores: makeStoresGrantCoordinators(ambientCtx, opts, st, metrics, registry, onLogEntryAdmitted), Regular: makeRegularGrantCoordinator(ambientCtx, opts, st, metrics, registry), Elastic: makeElasticGrantCoordinator(ambientCtx, st, registry), } @@ -399,6 +415,7 @@ func makeStoresGrantCoordinators( st *cluster.Settings, metrics GrantCoordinatorMetrics, registry *metric.Registry, + onLogEntryAdmitted OnLogEntryAdmitted, ) *StoreGrantCoordinators { // These metrics are shared across all stores and broken down by priority for // the common priorities. @@ -417,6 +434,7 @@ func makeStoresGrantCoordinators( makeStoreRequesterFunc: makeStoreRequester, kvIOTokensExhaustedDuration: metrics.KVIOTokensExhaustedDuration, workQueueMetrics: storeWorkQueueMetrics, + onLogEntryAdmitted: onLogEntryAdmitted, } return storeCoordinators } diff --git a/pkg/util/admission/granter.go b/pkg/util/admission/granter.go index 8787020d0886..a117e6141861 100644 --- a/pkg/util/admission/granter.go +++ b/pkg/util/admission/granter.go @@ -324,7 +324,7 @@ type kvStoreTokenChildGranter struct { parent *kvStoreTokenGranter } -var _ granterWithStoreWriteDone = &kvStoreTokenChildGranter{} +var _ granterWithStoreReplicatedWorkAdmitted = &kvStoreTokenChildGranter{} var _ granter = &kvStoreTokenChildGranter{} // grantKind implements granter. @@ -352,11 +352,23 @@ func (cg *kvStoreTokenChildGranter) continueGrantChain(grantChainID grantChainID // Ignore since grant chains are not used for store tokens. } -// storeWriteDone implements granterWithStoreWriteDone. +// storeWriteDone implements granterWithStoreReplicatedWorkAdmitted. func (cg *kvStoreTokenChildGranter) storeWriteDone( originalTokens int64, doneInfo StoreWorkDoneInfo, ) (additionalTokens int64) { - return cg.parent.storeWriteDone(cg.workClass, originalTokens, doneInfo) + cg.parent.coord.mu.Lock() + defer cg.parent.coord.mu.Unlock() + // NB: the token/metric adjustments we want to make here are the same as we + // want to make through the storeReplicatedWorkAdmittedLocked, so we (ab)use it. + return cg.parent.storeReplicatedWorkAdmittedLocked( + cg.workClass, originalTokens, storeReplicatedWorkAdmittedInfo(doneInfo)) +} + +// storeReplicatedWorkAdmitted implements granterWithStoreReplicatedWorkAdmitted. +func (cg *kvStoreTokenChildGranter) storeReplicatedWorkAdmittedLocked( + originalTokens int64, admittedInfo storeReplicatedWorkAdmittedInfo, +) (additionalTokens int64) { + return cg.parent.storeReplicatedWorkAdmittedLocked(cg.workClass, originalTokens, admittedInfo) } func (sg *kvStoreTokenGranter) tryGet(workClass admissionpb.WorkClass, count int64) bool { @@ -522,7 +534,7 @@ func (sg *kvStoreTokenGranter) getDiskTokensUsedAndReset() [admissionpb.NumWorkC } // setAdmittedModelsLocked implements granterWithIOTokens. -func (sg *kvStoreTokenGranter) setAdmittedDoneModels( +func (sg *kvStoreTokenGranter) setLinearModels( l0WriteLM tokensLinearModel, l0IngestLM tokensLinearModel, ingestLM tokensLinearModel, ) { sg.coord.mu.Lock() @@ -532,37 +544,21 @@ func (sg *kvStoreTokenGranter) setAdmittedDoneModels( sg.ingestLM = ingestLM } -// storeWriteDone implements granterWithStoreWriteDone. -func (sg *kvStoreTokenGranter) storeWriteDone( - wc admissionpb.WorkClass, originalTokens int64, doneInfo StoreWorkDoneInfo, +func (sg *kvStoreTokenGranter) storeReplicatedWorkAdmittedLocked( + wc admissionpb.WorkClass, originalTokens int64, admittedInfo storeReplicatedWorkAdmittedInfo, ) (additionalTokens int64) { - // Normally, we follow the structure of a foo() method calling into a foo() - // method on the GrantCoordinator, which then calls fooLocked() on the - // kvStoreTokenGranter. For example, returnGrant follows this structure. - // This allows the GrantCoordinator to do two things (a) acquire the mu - // before calling into kvStoreTokenGranter, (b) do side-effects, like - // terminating grant chains and doing more grants after the call into the - // fooLocked() method. - // For storeWriteDone we don't bother with this structure involving the - // GrantCoordinator (which has served us well across various methods and - // various granter implementations), since the decision on when the - // GrantCoordinator should call tryGrantLocked is more complicated. And since this - // storeWriteDone is unique to the kvStoreTokenGranter (and not implemented - // by other granters) this approach seems acceptable. - // Reminder: coord.mu protects the state in the kvStoreTokenGranter. - sg.coord.mu.Lock() exhaustedFunc := func() bool { return sg.coordMu.availableIOTokens <= 0 || (wc == admissionpb.ElasticWorkClass && sg.coordMu.elasticDiskBWTokensAvailable <= 0) } wasExhausted := exhaustedFunc() - actualL0WriteTokens := sg.l0WriteLM.applyLinearModel(doneInfo.WriteBytes) - actualL0IngestTokens := sg.l0IngestLM.applyLinearModel(doneInfo.IngestedBytes) + actualL0WriteTokens := sg.l0WriteLM.applyLinearModel(admittedInfo.WriteBytes) + actualL0IngestTokens := sg.l0IngestLM.applyLinearModel(admittedInfo.IngestedBytes) actualL0Tokens := actualL0WriteTokens + actualL0IngestTokens additionalL0TokensNeeded := actualL0Tokens - originalTokens sg.subtractTokensLocked(additionalL0TokensNeeded, false) - actualIngestTokens := sg.ingestLM.applyLinearModel(doneInfo.IngestedBytes) + actualIngestTokens := sg.ingestLM.applyLinearModel(admittedInfo.IngestedBytes) additionalDiskBWTokensNeeded := (actualL0WriteTokens + actualIngestTokens) - originalTokens if wc == admissionpb.ElasticWorkClass { sg.coordMu.elasticDiskBWTokensAvailable -= additionalDiskBWTokensNeeded @@ -574,7 +570,6 @@ func (sg *kvStoreTokenGranter) storeWriteDone( sg.coord.tryGrantLocked() } } - sg.coord.mu.Unlock() // For multi-tenant fairness accounting, we choose to ignore disk bandwidth // tokens. Ideally, we'd have multiple resource dimensions for the fairness // decisions, but we don't necessarily need something more sophisticated diff --git a/pkg/util/admission/granter_test.go b/pkg/util/admission/granter_test.go index c085950b12ce..9fc6555dfdd9 100644 --- a/pkg/util/admission/granter_test.go +++ b/pkg/util/admission/granter_test.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/datadriven" "github.com/cockroachdb/pebble" "github.com/stretchr/testify/require" @@ -97,7 +98,7 @@ func TestGranterBasic(t *testing.T) { return req } delayForGrantChainTermination = 0 - coords := NewGrantCoordinators(ambientCtx, settings, opts, registry) + coords := NewGrantCoordinators(ambientCtx, settings, opts, registry, &NoopOnLogEntryAdmitted{}) defer coords.Close() coord = coords.Regular return flushAndReset() @@ -109,8 +110,9 @@ func TestGranterBasic(t *testing.T) { storeCoordinators := &StoreGrantCoordinators{ settings: settings, makeStoreRequesterFunc: func( - ambientCtx log.AmbientContext, _ roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone, + ambientCtx log.AmbientContext, _ roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreReplicatedWorkAdmitted, settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions, knobs *TestingKnobs, + _ OnLogEntryAdmitted, _ *syncutil.Mutex, ) storeRequester { makeTestRequester := func(wc admissionpb.WorkClass) *testRequester { req := &testRequester{ @@ -148,7 +150,7 @@ func TestGranterBasic(t *testing.T) { kvStoreGranter := coord.granters[KVWork].(*kvStoreTokenGranter) // Use the same model for all 3 kinds of models. tlm := tokensLinearModel{multiplier: 0.5, constant: 50} - kvStoreGranter.setAdmittedDoneModels(tlm, tlm, tlm) + kvStoreGranter.setLinearModels(tlm, tlm, tlm) return flushAndReset() case "set-has-waiting-requests": @@ -232,7 +234,7 @@ func TestGranterBasic(t *testing.T) { var origTokens, writeBytes int d.ScanArgs(t, "orig-tokens", &origTokens) d.ScanArgs(t, "write-bytes", &writeBytes) - requesters[scanWorkKind(t, d)].granter.(granterWithStoreWriteDone).storeWriteDone( + requesters[scanWorkKind(t, d)].granter.(granterWithStoreReplicatedWorkAdmitted).storeWriteDone( int64(origTokens), StoreWorkDoneInfo{WriteBytes: int64(writeBytes)}) coord.testingTryGrant() return flushAndReset() @@ -274,8 +276,8 @@ func TestStoreCoordinators(t *testing.T) { opts := Options{ makeRequesterFunc: makeRequesterFunc, makeStoreRequesterFunc: func( - ctx log.AmbientContext, _ roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone, - settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions, _ *TestingKnobs) storeRequester { + ctx log.AmbientContext, _ roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreReplicatedWorkAdmitted, + settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions, _ *TestingKnobs, _ OnLogEntryAdmitted, _ *syncutil.Mutex) storeRequester { reqReg := makeRequesterFunc(ctx, KVWork, granters[admissionpb.RegularWorkClass], settings, metrics, opts) reqElastic := makeRequesterFunc(ctx, KVWork, granters[admissionpb.ElasticWorkClass], settings, metrics, opts) str := &storeTestRequester{} @@ -286,7 +288,7 @@ func TestStoreCoordinators(t *testing.T) { return str }, } - coords := NewGrantCoordinators(ambientCtx, settings, opts, registry) + coords := NewGrantCoordinators(ambientCtx, settings, opts, registry, &NoopOnLogEntryAdmitted{}) // There is only 1 KVWork requester at this point in initialization, for the // Regular GrantCoordinator. require.Equal(t, 1, len(requesters)) diff --git a/pkg/util/admission/io_load_listener.go b/pkg/util/admission/io_load_listener.go index f7b65d20ca21..028ad32f2e38 100644 --- a/pkg/util/admission/io_load_listener.go +++ b/pkg/util/admission/io_load_listener.go @@ -416,8 +416,8 @@ func (io *ioLoadListener) adjustTokens(ctx context.Context, metrics StoreMetrics io.copyAuxEtcFromPerWorkEstimator() requestEstimates := io.perWorkTokenEstimator.getStoreRequestEstimatesAtAdmission() io.kvRequester.setStoreRequestEstimates(requestEstimates) - l0WriteLM, l0IngestLM, ingestLM := io.perWorkTokenEstimator.getModelsAtAdmittedDone() - io.kvGranter.setAdmittedDoneModels(l0WriteLM, l0IngestLM, ingestLM) + l0WriteLM, l0IngestLM, ingestLM := io.perWorkTokenEstimator.getModelsAtDone() + io.kvGranter.setLinearModels(l0WriteLM, l0IngestLM, ingestLM) if _, overloaded := io.ioThreshold.Score(); overloaded || io.aux.doLogFlush || io.elasticDiskBWTokens != unlimitedTokens { log.Infof(ctx, "IO overload: %s", io.adjustTokensResult) @@ -433,7 +433,7 @@ func (io *ioLoadListener) copyAuxEtcFromPerWorkEstimator() { io.adjustTokensResult.aux.perWorkTokensAux = io.perWorkTokenEstimator.aux requestEstimates := io.perWorkTokenEstimator.getStoreRequestEstimatesAtAdmission() io.adjustTokensResult.requestEstimates = requestEstimates - l0WriteLM, l0IngestLM, ingestLM := io.perWorkTokenEstimator.getModelsAtAdmittedDone() + l0WriteLM, l0IngestLM, ingestLM := io.perWorkTokenEstimator.getModelsAtDone() io.adjustTokensResult.l0WriteLM = l0WriteLM io.adjustTokensResult.l0IngestLM = l0IngestLM io.adjustTokensResult.ingestLM = ingestLM diff --git a/pkg/util/admission/io_load_listener_test.go b/pkg/util/admission/io_load_listener_test.go index b10507d9cd7d..2f2bfc0cc425 100644 --- a/pkg/util/admission/io_load_listener_test.go +++ b/pkg/util/admission/io_load_listener_test.go @@ -370,7 +370,7 @@ func (g *testGranterWithIOTokens) getDiskTokensUsedAndReset() [admissionpb.NumWo return g.diskBandwidthTokensUsed } -func (g *testGranterWithIOTokens) setAdmittedDoneModels( +func (g *testGranterWithIOTokens) setLinearModels( l0WriteLM tokensLinearModel, l0IngestLM tokensLinearModel, ingestLM tokensLinearModel, ) { fmt.Fprintf(&g.buf, "setAdmittedDoneModelsLocked: l0-write-lm: ") @@ -409,7 +409,7 @@ func (g *testGranterNonNegativeTokens) getDiskTokensUsedAndReset() [admissionpb. return [admissionpb.NumWorkClasses]int64{} } -func (g *testGranterNonNegativeTokens) setAdmittedDoneModels( +func (g *testGranterNonNegativeTokens) setLinearModels( l0WriteLM tokensLinearModel, l0IngestLM tokensLinearModel, ingestLM tokensLinearModel, ) { require.LessOrEqual(g.t, 0.5, l0WriteLM.multiplier) diff --git a/pkg/util/admission/replicated_write_admission_test.go b/pkg/util/admission/replicated_write_admission_test.go index d25adb9895bb..4d1e368922dd 100644 --- a/pkg/util/admission/replicated_write_admission_test.go +++ b/pkg/util/admission/replicated_write_admission_test.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/datadriven" @@ -112,14 +113,15 @@ func TestReplicatedWriteAdmission(t *testing.T) { printTrimmedBytes(originalTokens), rwi.RangeID, rwi.Origin, rwi.LogPosition, ingested) }, } + var mockCoordMu syncutil.Mutex storeWorkQueue = makeStoreWorkQueue( log.MakeTestingAmbientContext(tracing.NewTracer()), roachpb.StoreID(1), - [admissionpb.NumWorkClasses]granterWithStoreWriteDone{ + [admissionpb.NumWorkClasses]granterWithStoreReplicatedWorkAdmitted{ tg[admissionpb.RegularWorkClass], tg[admissionpb.ElasticWorkClass], }, - st, metrics, opts, knobs, + st, metrics, opts, knobs, &NoopOnLogEntryAdmitted{}, &mockCoordMu, ).(*StoreWorkQueue) tg[admissionpb.RegularWorkClass].r = storeWorkQueue.getRequesters()[admissionpb.RegularWorkClass] tg[admissionpb.ElasticWorkClass].r = storeWorkQueue.getRequesters()[admissionpb.ElasticWorkClass] @@ -137,7 +139,7 @@ func TestReplicatedWriteAdmission(t *testing.T) { // Parse pri=. d.ScanArgs(t, "pri", &arg) - pri, found := reverseWorkPriorityDict[arg] + pri, found := admissionpb.TestingReverseWorkPriorityDict[arg] require.True(t, found) // Parse size=. @@ -369,15 +371,6 @@ func printWorkQueue(q *WorkQueue) string { // create-time= is relative to this time. var tzero = timeutil.Unix(0, 0) -var reverseWorkPriorityDict map[string]admissionpb.WorkPriority - -func init() { - reverseWorkPriorityDict = make(map[string]admissionpb.WorkPriority) - for k, v := range admissionpb.WorkPriorityDict { - reverseWorkPriorityDict[v] = k - } -} - type testReplicatedWriteGranter struct { t *testing.T wc admissionpb.WorkClass @@ -387,7 +380,7 @@ type testReplicatedWriteGranter struct { tokens int64 } -var _ granterWithStoreWriteDone = &testReplicatedWriteGranter{} +var _ granterWithStoreReplicatedWorkAdmitted = &testReplicatedWriteGranter{} func newTestReplicatedWriteGranter( t *testing.T, wc admissionpb.WorkClass, buf *builderWithMu, @@ -445,3 +438,10 @@ func (tg *testReplicatedWriteGranter) storeWriteDone( tg.tokens -= originalTokens return 0 } + +func (tg *testReplicatedWriteGranter) storeReplicatedWorkAdmittedLocked( + originalTokens int64, admittedInfo storeReplicatedWorkAdmittedInfo, +) (additionalTokens int64) { + tg.tokens -= originalTokens + return 0 +} diff --git a/pkg/util/admission/store_token_estimation.go b/pkg/util/admission/store_token_estimation.go index 2e2eca842b8e..ea3b83edb863 100644 --- a/pkg/util/admission/store_token_estimation.go +++ b/pkg/util/admission/store_token_estimation.go @@ -12,6 +12,12 @@ package admission import "github.com/cockroachdb/pebble" +// TODO(irfansharif): This comment is a bit stale with replication admission +// control where admission is asynchronous. AC is informed of the write when +// it's being physically done, so we know its size then. We don't need upfront +// estimates anymore. The AdmittedWorkDone interface and surrounding types +// (StoreWorkDoneInfo for ex.) are no longer central. +// // The logic in this file deals with token estimation for a store write in two // situations: (a) at admission time, (b) when the admitted work is done. At // (a) we have no information provided about the work size (NB: this choice is @@ -105,7 +111,13 @@ const ingestMultiplierMin = 0.5 const ingestMultiplierMax = 1.5 type storePerWorkTokenEstimator struct { - atAdmissionWorkTokens int64 + atAdmissionWorkTokens int64 + + // TODO(irfansharif): The linear model fitters below are actually not used + // for upfront per-work token estimation. They're used in the granter to + // figure out the rate of tokens to produce. This code organization is + // confusing -- rename the type? + atDoneL0WriteTokensLinearModel tokensLinearModelFitter atDoneL0IngestTokensLinearModel tokensLinearModelFitter // Unlike the models above that model bytes into L0, this model computes all @@ -238,7 +250,7 @@ func (e *storePerWorkTokenEstimator) getStoreRequestEstimatesAtAdmission() store return storeRequestEstimates{writeTokens: e.atAdmissionWorkTokens} } -func (e *storePerWorkTokenEstimator) getModelsAtAdmittedDone() ( +func (e *storePerWorkTokenEstimator) getModelsAtDone() ( l0WriteLM tokensLinearModel, l0IngestLM tokensLinearModel, ingestLM tokensLinearModel, diff --git a/pkg/util/admission/store_token_estimation_test.go b/pkg/util/admission/store_token_estimation_test.go index b2898f203b46..6f65c3da8894 100644 --- a/pkg/util/admission/store_token_estimation_test.go +++ b/pkg/util/admission/store_token_estimation_test.go @@ -77,7 +77,7 @@ func TestStorePerWorkTokenEstimator(t *testing.T) { admissionStats.statsToIgnore.Bytes += uint64(ignoreIngestedIntoL0) } estimator.updateEstimates(l0Metrics, cumLSMIngestedBytes, admissionStats) - wL0lm, iL0lm, ilm := estimator.getModelsAtAdmittedDone() + wL0lm, iL0lm, ilm := estimator.getModelsAtDone() require.Equal(t, wL0lm, estimator.atDoneL0WriteTokensLinearModel.smoothedLinearModel) require.Equal(t, iL0lm, estimator.atDoneL0IngestTokensLinearModel.smoothedLinearModel) require.Equal(t, ilm, estimator.atDoneIngestTokensLinearModel.smoothedLinearModel) diff --git a/pkg/util/admission/work_queue.go b/pkg/util/admission/work_queue.go index 1284f8bb7b1e..44c5c0a8cb77 100644 --- a/pkg/util/admission/work_queue.go +++ b/pkg/util/admission/work_queue.go @@ -633,6 +633,7 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err info.ReplicatedWorkInfo, info.RequestedCount, info.CreateTime, + false, /* coordMuLocked */ ) } return true, nil @@ -679,6 +680,7 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err } } } + // Check for cancellation. startTime := q.timeNow() if ctx.Err() != nil { @@ -861,6 +863,7 @@ func (q *WorkQueue) granted(grantChainID grantChainID) int64 { item.replicated, item.requestedCount, item.createTime, + true, /* coordMuLocked */ ) q.metrics.incAdmitted(item.priority) @@ -1672,7 +1675,7 @@ func (m *WorkQueueMetrics) getOrCreate(priority admissionpb.WorkPriority) workQu // necessary to call LoadOrStore here as this could be called concurrently. // It is not called the first Load so that we don't have to unnecessarily // create the metrics. - statPrefix := fmt.Sprintf("%v.%v", m.name, admissionpb.WorkPriorityDict[priority]) + statPrefix := fmt.Sprintf("%v.%v", m.name, priority.String()) val, ok = m.byPriority.LoadOrStore(priority, makeWorkQueueMetricsSingle(statPrefix)) if !ok { m.registry.AddMetricStruct(val) @@ -1766,9 +1769,10 @@ type StoreWriteWorkInfo struct { type StoreWorkQueue struct { storeID roachpb.StoreID q [admissionpb.NumWorkClasses]WorkQueue - // Only calls storeWriteDone. The rest of the interface is used by + // Only calls storeReplicatedWorkAdmittedLocked. The rest of the interface is used by // WorkQueue. - granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone + granters [admissionpb.NumWorkClasses]granterWithStoreReplicatedWorkAdmitted + coordMu *syncutil.Mutex mu struct { syncutil.RWMutex // estimates is used to determine how many tokens are deducted at-admit @@ -1788,6 +1792,7 @@ type StoreWorkQueue struct { stopCh chan struct{} timeSource timeutil.TimeSource settings *cluster.Settings + onLogEntryAdmitted OnLogEntryAdmitted knobs *TestingKnobs } @@ -1883,6 +1888,22 @@ type StoreWorkDoneInfo struct { IngestedBytes int64 } +// storeReplicatedWorkAdmittedInfo provides information about the size of +// replicated work once it's admitted (which happens asynchronously from the +// work itself). This lets us use the underlying linear models for L0 +// {writes,ingests} to deduct an appropriate number of tokens from the granter, +// for the admitted work size. +// +// TODO(irfansharif): This post-admission adjustment of tokens is odd -- when +// the replicated work is being enqueued, we already know its size, so we could +// have applied the linear models upfront and determine what the right # of +// tokens to deduct all at once. We're doing it this way because we've written +// the WorkQueue and granter interactions to be very general, but it can be hard +// to follow. See review discussions over at #97599. It's worth noting that +// there isn't really a lag in the adjustment, so it is harmless from an +// operational perspective of admission control. +type storeReplicatedWorkAdmittedInfo StoreWorkDoneInfo + type onAdmittedReplicatedWork interface { admittedReplicatedWork( tenantID roachpb.TenantID, @@ -1890,7 +1911,10 @@ type onAdmittedReplicatedWork interface { rwi ReplicatedWorkInfo, requestedTokens int64, createTime int64, + coordMuLocked bool, ) + + // TODO(irfansharif): This coordMuLocked parameter is gross. } var _ onAdmittedReplicatedWork = &StoreWorkQueue{} @@ -1903,6 +1927,7 @@ func (q *StoreWorkQueue) admittedReplicatedWork( rwi ReplicatedWorkInfo, originalTokens int64, createTime int64, // only used in tests + coordMuLocked bool, ) { if !rwi.Enabled { panic("unexpected call to admittedReplicatedWork for work that's not a replicated write") @@ -1911,11 +1936,11 @@ func (q *StoreWorkQueue) admittedReplicatedWork( fn(tenantID, pri, rwi, originalTokens, createTime) } - var storeWorkDoneInfo StoreWorkDoneInfo + var replicatedWorkAdmittedInfo storeReplicatedWorkAdmittedInfo if rwi.Ingested { - storeWorkDoneInfo.IngestedBytes = originalTokens + replicatedWorkAdmittedInfo.IngestedBytes = originalTokens } else { - storeWorkDoneInfo.WriteBytes = originalTokens + replicatedWorkAdmittedInfo.WriteBytes = originalTokens } // We've already used RequestedCount for replicated writes to deduct tokens @@ -1925,19 +1950,55 @@ func (q *StoreWorkQueue) admittedReplicatedWork( // underlying linear models, and we may have under-deducted -- we account // for this below. wc := admissionpb.WorkClassFromPri(pri) - additionalTokensNeeded := q.granters[wc].storeWriteDone(originalTokens, storeWorkDoneInfo) + if !coordMuLocked { + q.coordMu.Lock() + } + additionalTokensNeeded := q.granters[wc].storeReplicatedWorkAdmittedLocked(originalTokens, replicatedWorkAdmittedInfo) + if !coordMuLocked { + q.coordMu.Unlock() + } q.q[wc].adjustTenantTokens(tenantID, additionalTokensNeeded) - // TODO(irfansharif): Dispatch flow token returns here. We want to - // inform (a) the origin node of writes at (b) a given priority, to - // (c) the given range, at (d) the given log position on (e) the - // local store. Part of #95563. + // Inform callers of the entry we just admitted. // - _ = rwi.Origin // (a) - _ = pri // (b) - _ = rwi.RangeID // (c) - _ = rwi.LogPosition // (d) - _ = q.storeID // (e) + // TODO(irfansharif): It's bad that we're extending coord.mu's critical + // section to this callback. We can't prevent it when this is happening via + // WorkQueue.granted since it was called while holding coord.mu. We should + // revisit -- one possibility is to add this to a notification queue and + // have a separate goroutine invoke these callbacks (without holding + // coord.mu). We could directly invoke here too if not holding the lock. + q.onLogEntryAdmitted.AdmittedLogEntry( + rwi.Origin, + pri, + q.storeID, + rwi.RangeID, + rwi.LogPosition, + ) +} + +// OnLogEntryAdmitted is used to observe the specific entries (identified by +// rangeID + log position) that were admitted. Since admission control for log +// entries is asynchronous/non-blocking, this allows callers to do requisite +// post-admission bookkeeping. +type OnLogEntryAdmitted interface { + AdmittedLogEntry( + origin roachpb.NodeID, /* node where the entry originated */ + pri admissionpb.WorkPriority, /* admission priority of the entry */ + storeID roachpb.StoreID, /* store on which the entry was admitted */ + rangeID roachpb.RangeID, /* identifying range for the log entry */ + pos LogPosition, /* log position of the entry that was admitted*/ + ) +} + +// NoopOnLogEntryAdmitted is a no-op implementation of the OnLogEntryAdmitted +// interface. +type NoopOnLogEntryAdmitted struct{} + +var _ OnLogEntryAdmitted = &NoopOnLogEntryAdmitted{} + +func (n *NoopOnLogEntryAdmitted) AdmittedLogEntry( + roachpb.NodeID, admissionpb.WorkPriority, roachpb.StoreID, roachpb.RangeID, LogPosition, +) { } // AdmittedWorkDone indicates to the queue that the admitted work has completed. @@ -2026,11 +2087,13 @@ func (q *StoreWorkQueue) setStoreRequestEstimates(estimates storeRequestEstimate func makeStoreWorkQueue( ambientCtx log.AmbientContext, storeID roachpb.StoreID, - granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone, + granters [admissionpb.NumWorkClasses]granterWithStoreReplicatedWorkAdmitted, settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions, knobs *TestingKnobs, + onLogEntryAdmitted OnLogEntryAdmitted, + coordMu *syncutil.Mutex, ) storeRequester { if knobs == nil { knobs = &TestingKnobs{} @@ -2039,12 +2102,14 @@ func makeStoreWorkQueue( opts.timeSource = timeutil.DefaultTimeSource{} } q := &StoreWorkQueue{ - storeID: storeID, - granters: granters, - knobs: knobs, - stopCh: make(chan struct{}), - timeSource: opts.timeSource, - settings: settings, + coordMu: coordMu, + storeID: storeID, + granters: granters, + knobs: knobs, + stopCh: make(chan struct{}), + timeSource: opts.timeSource, + settings: settings, + onLogEntryAdmitted: onLogEntryAdmitted, } opts.usesAsyncAdmit = true diff --git a/pkg/util/admission/work_queue_test.go b/pkg/util/admission/work_queue_test.go index f4902930be32..66da1387e973 100644 --- a/pkg/util/admission/work_queue_test.go +++ b/pkg/util/admission/work_queue_test.go @@ -67,7 +67,7 @@ type testGranter struct { additionalTokens int64 } -var _ granterWithStoreWriteDone = &testGranter{} +var _ granterWithStoreReplicatedWorkAdmitted = &testGranter{} func (tg *testGranter) grantKind() grantKind { return tg.gk @@ -110,6 +110,14 @@ func (tg *testGranter) storeWriteDone( return tg.additionalTokens } +func (tg *testGranter) storeReplicatedWorkAdmittedLocked( + originalTokens int64, admittedInfo storeReplicatedWorkAdmittedInfo, +) (additionalTokens int64) { + tg.buf.printf("storeReplicatedWorkAdmittedLocked%s: originalTokens %d, admittedBytes(write %d,ingested %d) returning %d", + tg.name, originalTokens, admittedInfo.WriteBytes, admittedInfo.IngestedBytes, tg.additionalTokens) + return tg.additionalTokens +} + type testWork struct { tenantID roachpb.TenantID cancel context.CancelFunc @@ -522,9 +530,13 @@ func TestStoreWorkQueueBasic(t *testing.T) { opts.timeSource = timeutil.NewManualTime(timeutil.FromUnixMicros(0)) opts.disableEpochClosingGoroutine = true st = cluster.MakeTestingClusterSettings() + var mockCoordMu syncutil.Mutex q = makeStoreWorkQueue(log.MakeTestingAmbientContext(tracing.NewTracer()), roachpb.StoreID(1), - [admissionpb.NumWorkClasses]granterWithStoreWriteDone{tg[admissionpb.RegularWorkClass], tg[admissionpb.ElasticWorkClass]}, - st, metrics, opts, nil /* testing knobs */).(*StoreWorkQueue) + [admissionpb.NumWorkClasses]granterWithStoreReplicatedWorkAdmitted{ + tg[admissionpb.RegularWorkClass], + tg[admissionpb.ElasticWorkClass], + }, + st, metrics, opts, nil /* testing knobs */, &NoopOnLogEntryAdmitted{}, &mockCoordMu).(*StoreWorkQueue) tg[admissionpb.RegularWorkClass].r = q.getRequesters()[admissionpb.RegularWorkClass] tg[admissionpb.ElasticWorkClass].r = q.getRequesters()[admissionpb.ElasticWorkClass] wrkMap.resetMap()