Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
79967: kv/bulk: parallelize sending SSTs due to range bounds r=dt a=dt

Previously the batcher, when it determined it needed to finish one SST
and send it before starting another, would wait for it to be sent before
moving one. When flushing a buffer that contained data that mapped to
many ranges, this meant many serial flushes, e.g. flushing 512MB of data
from a buffer that had keys uniformly distributed over a table which was
split into 2000 ranges meant waiting for roughly 2000 sequential AddSSTable
requests. When those requests were slow, for example sometimes taking as
much as 1s each or more, this became a major bottleneck.

This change switches the batcher to send files that are ended due to a
round boundary asynchronously, queuing up the request to send and then
starting the next file while it sends, as long as memory capacity in the
monitor allows holding the extra file in memory (as these async sends
could result in using an entire extra buffer's worth of memory if they
all end up in-flight at once, which they easily could if the receivers
are queuing).

Release note (performance improvement): Bulk ingestion of unsorted data during IMPORT and schema changes uses a higher level of parallelism to send produced data to the storage layer.


79991: allocator: carve out a package boundary r=irfansharif a=irfansharif

**allocator/storepool: carve out package from kvserver**

This commit tries to pave the way for the allocator itself to be carved
out (storepool probably being a direct dependency, but ideally through
an interface). This commit shamelessly exports any and everything needed
in order for StorePool to sit outside the kvserver package boundary.
It's unfortunate how tangled up this component is, including in tests
(some of which reach deep into inner mutexes and mutate state). We're
not doing anything to improve the status quo other than moving it and
maybe pointing to the awkwardness.

**allocatorimpl: carve out an allocator package**

Do the same for kvserver/replicastats, a type that's used as an input
for the allocator at various points (needed to break the dependency
between the allocatorimpl and kvserver). We take the same approach as
the last commit, exporting anything and everything needed to introduce a
hard package boundary.


80197: ui: move shared graph components to cluster-ui r=jocrl,maryliag a=xinhaoz

Part of #74516

This commit moves shared graph functions and components to
cluster-ui package. This is to enable the new bar chart
component to share axes utilities and containers with
the older line graph component in db-console.

Release note: None

80335: evalctx: remove some redundant uses of Txn on ExtendedEvalContext  r=otan a=RichardJCai

evalctx: remove some redundant uses of Txn on ExtendedEvalContext

Release note: None

Part of series of effort to get rid of kv.Txn from EvalContext

80349: dev: add example of bench with profiling r=jordanlewis a=jordanlewis

Release note: None

80379: dev: bump version r=dt,erikgrinaker a=rickystewart

Problems with `dev generate cgo` have been [noticed](#79455 (comment))
that are resolved if you re-build.

Release note: None

Co-authored-by: David Taylor <[email protected]>
Co-authored-by: irfan sharif <[email protected]>
Co-authored-by: Xin Hao Zhang <[email protected]>
Co-authored-by: richardjcai <[email protected]>
Co-authored-by: Jordan Lewis <[email protected]>
Co-authored-by: Ricky Stewart <[email protected]>
  • Loading branch information
7 people committed Apr 22, 2022
7 parents 6537583 + db8cb1f + 4f96b1b + b094c6c + 090d163 + 4ea3eec + 474e7d4 commit 11a0a9f
Show file tree
Hide file tree
Showing 124 changed files with 4,578 additions and 4,124 deletions.
2 changes: 1 addition & 1 deletion dev
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
set -euo pipefail

# Bump this counter to force rebuilding `dev` on all machines.
DEV_VERSION=29
DEV_VERSION=30

THIS_DIR=$(cd "$(dirname "$0")" && pwd)
BINARY_DIR=$THIS_DIR/bin/dev-versions
Expand Down
3 changes: 3 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ ALL_TESTS = [
"//pkg/kv/kvnemesis:kvnemesis_test",
"//pkg/kv/kvprober:kvprober_test",
"//pkg/kv/kvserver/abortspan:abortspan_test",
"//pkg/kv/kvserver/allocator/allocatorimpl:allocatorimpl_test",
"//pkg/kv/kvserver/allocator/storepool:storepool_test",
"//pkg/kv/kvserver/apply:apply_test",
"//pkg/kv/kvserver/asim:asim_test",
"//pkg/kv/kvserver/batcheval/result:result_test",
Expand All @@ -160,6 +162,7 @@ ALL_TESTS = [
"//pkg/kv/kvserver/raftentry:raftentry_test",
"//pkg/kv/kvserver/rangefeed:rangefeed_test",
"//pkg/kv/kvserver/rditer:rditer_test",
"//pkg/kv/kvserver/replicastats:replicastats_test",
"//pkg/kv/kvserver/reports:reports_test",
"//pkg/kv/kvserver/spanlatch:spanlatch_test",
"//pkg/kv/kvserver/spanset:spanset_test",
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
if details.URI == "" {
initialDetails := details
backupDetails, m, err := getBackupDetailAndManifest(
ctx, p.ExecCfg(), p.ExtendedEvalContext().Txn, details, p.User(),
ctx, p.ExecCfg(), p.Txn(), details, p.User(),
)
if err != nil {
return err
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,7 @@ func backupPlanHook(
return sqlDescIDs
}(),
}
plannerTxn := p.ExtendedEvalContext().Txn
plannerTxn := p.Txn()

if backupStmt.Options.Detached {
// When running inside an explicit transaction, we simply create the job
Expand Down Expand Up @@ -798,7 +798,7 @@ func backupPlanHook(

// TODO(dt): delete this in 22.2.
backupDetails, backupManifest, err := getBackupDetailAndManifest(
ctx, p.ExecCfg(), p.ExtendedEvalContext().Txn, initialDetails, p.User(),
ctx, p.ExecCfg(), p.Txn(), initialDetails, p.User(),
)
if err != nil {
return err
Expand All @@ -811,7 +811,7 @@ func backupPlanHook(

// We create the job record in the planner's transaction to ensure that
// the job record creation happens transactionally.
plannerTxn := p.ExtendedEvalContext().Txn
plannerTxn := p.Txn()

// Write backup manifest into a temporary checkpoint file.
// This accomplishes 2 purposes:
Expand Down
14 changes: 7 additions & 7 deletions pkg/ccl/backupccl/create_scheduled_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ func doCreateBackupSchedules(
inc.Pause()
inc.SetScheduleStatus("Waiting for initial backup to complete")

if err := inc.Create(ctx, ex, p.ExtendedEvalContext().Txn); err != nil {
if err := inc.Create(ctx, ex, p.Txn()); err != nil {
return err
}
if err := emitSchedule(inc, backupNode, destinations, nil, /* incrementalFrom */
Expand Down Expand Up @@ -491,20 +491,20 @@ func doCreateBackupSchedules(
}

// Create the schedule (we need its ID to link dependent schedules below).
if err := full.Create(ctx, ex, p.ExtendedEvalContext().Txn); err != nil {
if err := full.Create(ctx, ex, p.Txn()); err != nil {
return err
}

// If schedule creation has resulted in a full and incremental schedule then
// we update both the schedules with the ID of the other "dependent" schedule.
if incRecurrence != nil {
if err := setDependentSchedule(ctx, ex, fullScheduledBackupArgs, full, inc.ScheduleID(),
p.ExtendedEvalContext().Txn); err != nil {
p.Txn()); err != nil {
return errors.Wrap(err,
"failed to update full schedule with dependent incremental schedule id")
}
if err := setDependentSchedule(ctx, ex, incScheduledBackupArgs, inc, full.ScheduleID(),
p.ExtendedEvalContext().Txn); err != nil {
p.Txn()); err != nil {
return errors.Wrap(err,
"failed to update incremental schedule with dependent full schedule id")
}
Expand Down Expand Up @@ -664,7 +664,7 @@ func checkScheduleAlreadyExists(
) (bool, error) {

row, err := p.ExecCfg().InternalExecutor.QueryRowEx(ctx, "check-sched",
p.ExtendedEvalContext().Txn, sessiondata.InternalExecutorOverride{User: security.RootUserName()},
p.Txn(), sessiondata.InternalExecutorOverride{User: security.RootUserName()},
fmt.Sprintf("SELECT count(schedule_name) FROM %s WHERE schedule_name = '%s'",
scheduledjobs.ProdJobSchedulerEnv.ScheduledJobsTableName(), scheduleLabel))

Expand All @@ -677,12 +677,12 @@ func checkScheduleAlreadyExists(
// dryRunBackup executes backup in dry-run mode: we simply execute backup
// under transaction savepoint, and then rollback to that save point.
func dryRunBackup(ctx context.Context, p sql.PlanHookState, backupNode *tree.Backup) error {
sp, err := p.ExtendedEvalContext().Txn.CreateSavepoint(ctx)
sp, err := p.Txn().CreateSavepoint(ctx)
if err != nil {
return err
}
err = dryRunInvokeBackup(ctx, p, backupNode)
if rollbackErr := p.ExtendedEvalContext().Txn.RollbackToSavepoint(ctx, sp); rollbackErr != nil {
if rollbackErr := p.Txn().RollbackToSavepoint(ctx, sp); rollbackErr != nil {
return rollbackErr
}
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry(
if err != nil {
return summary, err
}
defer batcher.Close()
defer batcher.Close(ctx)

var keyScratch, valueScratch []byte

Expand Down
8 changes: 4 additions & 4 deletions pkg/ccl/backupccl/restore_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -1630,7 +1630,7 @@ func doRestorePlan(
return errors.Errorf("%q option can only be used when restoring a single tenant", restoreOptAsTenant)
}
res, err := p.ExecCfg().InternalExecutor.QueryRow(
ctx, "restore-lookup-tenant", p.ExtendedEvalContext().Txn,
ctx, "restore-lookup-tenant", p.Txn(),
`SELECT active FROM system.tenants WHERE id = $1`, newTenantID.ToUint64(),
)
if err != nil {
Expand All @@ -1645,7 +1645,7 @@ func doRestorePlan(
} else {
for _, i := range tenants {
res, err := p.ExecCfg().InternalExecutor.QueryRow(
ctx, "restore-lookup-tenant", p.ExtendedEvalContext().Txn,
ctx, "restore-lookup-tenant", p.Txn(),
`SELECT active FROM system.tenants WHERE id = $1`, i.ID,
)
if err != nil {
Expand Down Expand Up @@ -1836,7 +1836,7 @@ func doRestorePlan(
// We do not wait for the job to finish.
jobID := p.ExecCfg().JobRegistry.MakeJobID()
_, err := p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn(
ctx, jr, jobID, p.ExtendedEvalContext().Txn)
ctx, jr, jobID, p.Txn())
if err != nil {
return err
}
Expand All @@ -1847,7 +1847,7 @@ func doRestorePlan(

// We create the job record in the planner's transaction to ensure that
// the job record creation happens transactionally.
plannerTxn := p.ExtendedEvalContext().Txn
plannerTxn := p.Txn()

// Construct the job and commit the transaction. Perform this work in a
// closure to ensure that the job is cleaned up if an error occurs.
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/changefeedccl/alter_changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func alterChangefeedPlanHook(
}
jobID := jobspb.JobID(tree.MustBeDInt(typedExpr))

job, err := p.ExecCfg().JobRegistry.LoadJobWithTxn(ctx, jobID, p.ExtendedEvalContext().Txn)
job, err := p.ExecCfg().JobRegistry.LoadJobWithTxn(ctx, jobID, p.Txn())
if err != nil {
err = errors.Wrapf(err, `could not load job with job id %d`, jobID)
return err
Expand Down Expand Up @@ -142,7 +142,7 @@ func alterChangefeedPlanHook(
newPayload.Description = jobRecord.Description
newPayload.DescriptorIDs = jobRecord.DescriptorIDs

err = p.ExecCfg().JobRegistry.UpdateJobWithTxn(ctx, jobID, p.ExtendedEvalContext().Txn, lockForUpdate, func(
err = p.ExecCfg().JobRegistry.UpdateJobWithTxn(ctx, jobID, p.Txn(), lockForUpdate, func(
txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater,
) error {
ju.UpdatePayload(&newPayload)
Expand Down Expand Up @@ -344,7 +344,7 @@ func generateNewTargets(
k := targetKey{TableID: targetSpec.TableID, FamilyName: tree.Name(targetSpec.FamilyName)}
desc := descResolver.DescByID[targetSpec.TableID].(catalog.TableDescriptor)

tbName, err := getQualifiedTableNameObj(ctx, p.ExecCfg(), p.ExtendedEvalContext().Txn, desc)
tbName, err := getQualifiedTableNameObj(ctx, p.ExecCfg(), p.Txn(), desc)
if err != nil {
return nil, nil, hlc.Timestamp{}, nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ func getTargetsAndTables(
}
} else {
_, qualified := opts[changefeedbase.OptFullTableName]
name, err := getChangefeedTargetName(ctx, td, p.ExecCfg(), p.ExtendedEvalContext().Txn, qualified)
name, err := getChangefeedTargetName(ctx, td, p.ExecCfg(), p.Txn(), qualified)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -935,7 +935,7 @@ func (b *changefeedResumer) handleChangefeedError(
const errorFmt = "job failed (%v) but is being paused because of %s=%s"
errorMessage := fmt.Sprintf(errorFmt, changefeedErr,
changefeedbase.OptOnError, changefeedbase.OptOnErrorPause)
return b.job.PauseRequested(ctx, jobExec.ExtendedEvalContext().Txn, func(ctx context.Context,
return b.job.PauseRequested(ctx, jobExec.Txn(), func(ctx context.Context,
planHookState interface{}, txn *kv.Txn, progress *jobspb.Progress) error {
err := b.OnPauseRequest(ctx, jobExec, txn, progress)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func ingestionPlanHook(

jobID := p.ExecCfg().JobRegistry.MakeJobID()
sj, err := p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn(ctx, jr,
jobID, p.ExtendedEvalContext().Txn)
jobID, p.Txn())
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ func (sip *streamIngestionProcessor) close() {
_ = client.Close()
}
if sip.batcher != nil {
sip.batcher.Close()
sip.batcher.Close(sip.Ctx)
}
if sip.maxFlushRateTimer != nil {
sip.maxFlushRateTimer.Stop()
Expand Down
1 change: 1 addition & 0 deletions pkg/cli/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ go_test(
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/allocator/storepool",
"//pkg/kv/kvserver/liveness/livenesspb",
"//pkg/kv/kvserver/loqrecovery/loqrecoverypb",
"//pkg/kv/kvserver/stateloader",
Expand Down
3 changes: 2 additions & 1 deletion pkg/cli/zip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/server"
Expand Down Expand Up @@ -439,7 +440,7 @@ func TestPartialZip(t *testing.T) {
// is no risk to see the override bumped due to a gossip update
// because this setting is not otherwise set in the test cluster.
s := tc.Server(0)
kvserver.TimeUntilStoreDead.Override(ctx, &s.ClusterSettings().SV, kvserver.TestTimeUntilStoreDead)
storepool.TimeUntilStoreDead.Override(ctx, &s.ClusterSettings().SV, storepool.TestTimeUntilStoreDead)

// This last case may take a little while to converge. To make this work with datadriven and at the same
// time retain the ability to use the `-rewrite` flag, we use a retry loop within that already checks the
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/allocsim/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ go_library(
"//pkg/acceptance/localcluster/tc",
"//pkg/cli",
"//pkg/cli/exit",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/allocator/allocatorimpl",
"//pkg/server/serverpb",
"//pkg/util/log",
"//pkg/util/randutil",
Expand Down
4 changes: 2 additions & 2 deletions pkg/cmd/allocsim/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/acceptance/localcluster/tc"
"github.com/cockroachdb/cockroach/pkg/cli"
"github.com/cockroachdb/cockroach/pkg/cli/exit"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
Expand Down Expand Up @@ -403,7 +403,7 @@ func handleStart() bool {
// in the few minutes after allocsim starts up causes it to take a long time
// for leases to settle onto other nodes even when requests are skewed heavily
// onto them.
kvserver.MinLeaseTransferStatsDuration = 10 * time.Second
allocatorimpl.MinLeaseTransferStatsDuration = 10 * time.Second

cli.Main()
return true
Expand Down
3 changes: 2 additions & 1 deletion pkg/cmd/dev/bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ func makeBenchCmd(runE func(cmd *cobra.Command, args []string) error) *cobra.Com
Long: `Run the specified benchmarks.`,
Example: `
dev bench pkg/sql/parser --filter=BenchmarkParse
dev bench pkg/bench -f='BenchmarkTracing/1node/scan/trace=off' --count=2 --bench-time=10x --bench-mem`,
dev bench pkg/bench -f='BenchmarkTracing/1node/scan/trace=off' --count=2 --bench-time=10x --bench-mem
dev bench pkg/bench -f='BenchmarkTracing/1node/scan/trace=off' --ignore-cache --test-args='-test.cpuprofile=cpu.out -test.memprofile=mem.out'`,
Args: cobra.MinimumNArgs(0),
RunE: runE,
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/bulk/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ go_library(
"//pkg/storage",
"//pkg/storage/enginepb",
"//pkg/util/admission",
"//pkg/util/ctxgroup",
"//pkg/util/hlc",
"//pkg/util/humanizeutil",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/mon",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
Expand Down
10 changes: 6 additions & 4 deletions pkg/kv/bulk/buffering_adder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -96,7 +97,7 @@ func MakeBulkAdder(
disallowShadowingBelow: opts.DisallowShadowingBelow,
batchTS: opts.BatchTimestamp,
writeAtBatchTS: opts.WriteAtBatchTimestamp,
stats: ingestionPerformanceStats{sendWaitByStore: make(map[roachpb.StoreID]time.Duration)},
mem: bulkMon.MakeBoundAccount(),
},
timestamp: timestamp,
maxBufferLimit: opts.MaxBufferSize,
Expand All @@ -106,6 +107,7 @@ func MakeBulkAdder(
lastFlush: timeutil.Now(),
}

b.sink.mem.Mu = &syncutil.Mutex{}
// At minimum a bulk adder needs enough space to store a buffer of
// curBufferSize, and a subsequent SST of SSTSize in-memory. If the memory
// account is unable to reserve this minimum threshold we cannot continue.
Expand Down Expand Up @@ -141,7 +143,7 @@ func (b *BufferingAdder) Close(ctx context.Context) {
log.Infof(ctx, "%s adder closing; ingested nothing", b.name)
}
}
b.sink.Close()
b.sink.Close(ctx)

if b.bulkMon != nil {
b.memAcc.Close(ctx)
Expand Down Expand Up @@ -228,7 +230,7 @@ func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error {
b.sink.stats.bufferFlushes++

before := b.sink.stats
beforeSize := b.sink.totalRows.DataSize
beforeSize := b.sink.mu.totalRows.DataSize

beforeSort := timeutil.Now()

Expand Down Expand Up @@ -277,7 +279,7 @@ func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error {
b.sink.stats.flushWait += timeutil.Since(beforeFlush)

if log.V(3) {
written := b.sink.totalRows.DataSize - beforeSize
written := b.sink.mu.totalRows.DataSize - beforeSize
files := b.sink.stats.batches - before.batches
dueToSplits := b.sink.stats.batchesDueToRange - before.batchesDueToRange
dueToSize := b.sink.stats.batchesDueToRange - before.batchesDueToRange
Expand Down
Loading

0 comments on commit 11a0a9f

Please sign in to comment.