Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
115146: importer: support arrays of UDT in some cases r=yuzefovich a=yuzefovich

This commit adjusts the import code to support IMPORT INTO tables with columns typed as arrays of UDTs. This required a couple of minor changes:
- propagating the `SemaCtx` into `ParseDatumStringAs`. Previously, we would always create a fresh one, but now all callers of this method have been adjusted to provide the one they have in scope. This allows us to parse expressions of the form `'Monday'::db.sc.enum_name` where `'Monday'` is a member of an enum.
- implement `importTypeResolver.ResolveType` in the "happy" case. This is used to resolve casts from above to a concrete type.

Note that `ResolveType` implementation is incomplete in the general case. In particular, whenever an import job is created, we _might_ have a set of types available (it appears that this is the case when we're importing into one table - for example, we have logic for importing the whole pgdump, there we can have multiple tables, and set of types won't be available). Whenever we do have a set of types, we can simplify the type resolution to simply match on the name of the type.

However, if we happen to have a table that uses two UDTs with the same name but different schemas, this simplistic resolution won't work, so we still return an error in this case.

Fixes: #112100.

Release note (sql change): CockroachDB now supports IMPORT INTO a table that has columns typed as arrays of user-defined types (like enums). Tables that uses multiple user-defined types with the same name but different schemas are still unsupported.

115674: sql: use background QoS for atomic COPY r=yuzefovich a=yuzefovich

We recently merged a change to make COPY use "background" QoS by default. However, that change was incomplete - it only made it so that we use the "background" QoS only whenever a new txn is started by the copy state machine, and we forgot to make the corresponding update for the initial txn created outside of the state machine, if we're in an implicit txn. This is now fixed.

Note that this initial txn is only used for "atomic" COPY because for non-atomic we always create a fresh txn before each batch.

This commit also adjusts an existing test to verify that the expected QoS is used for all implicit txns.

Epic: None

Release note: None

116300: roachtest: speed up import cancellation r=yuzefovich a=yuzefovich

Previously, when determining the number of TPCH queries we're running during the IMPORT, we used the length of the string, which made it so that we ran queries more than we intended. This is now fixed.

Fixes: #116299.

Epic: None

Release note: None

116302: roachtest: unskip restore/online/tpce/400GB/aws/inc-count=1/nodes=4/cpus=8 r=adityamaru a=msbutler

Epic: none

Release note: none

Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Michael Butler <[email protected]>
  • Loading branch information
3 people committed Dec 13, 2023
5 parents c92d216 + 044d04f + 94f1aac + faf2cde + 7fe7a2d commit 4a10b8f
Show file tree
Hide file tree
Showing 27 changed files with 296 additions and 165 deletions.
2 changes: 1 addition & 1 deletion pkg/cli/statement_bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func getExplainCombinations(
}
upperBound := bucket["upper_bound"].(string)
bucketMap[key] = []string{upperBound}
datum, err := rowenc.ParseDatumStringAs(ctx, colType, upperBound, &evalCtx)
datum, err := rowenc.ParseDatumStringAs(ctx, colType, upperBound, &evalCtx, nil /* semaCtx */)
if err != nil {
panic("failed parsing datum string as " + datum.String() + " " + err.Error())
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/cmd/roachtest/tests/import_cancellation.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ func runImportCancellation(ctx context.Context, t test.Test, c cluster.Cluster)
// for Scale Factor 1, so since we're using Scale Factor 100 some TPCH
// queries are expected to return different results - skip those.
var queries string
var numQueries int
for i := 1; i <= tpch.NumQueries; i++ {
switch i {
case 11, 13, 16, 18, 20:
Expand All @@ -155,11 +156,12 @@ func runImportCancellation(ctx context.Context, t test.Test, c cluster.Cluster)
queries += ","
}
queries += strconv.Itoa(i)
numQueries++
}
}
// maxOps flag will allow us to exit the workload once all the queries
// were run 2 times.
maxOps := 2 * len(queries)
maxOps := 2 * numQueries
cmd := fmt.Sprintf(
"./workload run tpch --db=csv --concurrency=1 --queries=%s --max-ops=%d {pgurl%s} "+
"--enable-checks=true", queries, maxOps, c.All())
Expand Down
187 changes: 88 additions & 99 deletions pkg/cmd/roachtest/tests/online_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ func registerOnlineRestore(r registry.Registry) {
timeout: 5 * time.Hour,
suites: registry.Suites(registry.Nightly),
restoreUptoIncremental: 1,
skip: "used for ad hoc experiments",
},
{
// 8TB tpce Online Restore
Expand All @@ -60,111 +59,101 @@ func registerOnlineRestore(r registry.Registry) {
},
} {
for _, runOnline := range []bool{true, false} {
for _, runWorkload := range []bool{true, false} {
sp := sp
runOnline := runOnline
runWorkload := runWorkload
sp := sp
runOnline := runOnline

if runOnline {
sp.namePrefix = "online/"
} else {
sp.namePrefix = "offline/"
sp.skip = "used for ad hoc experiments"
}
sp.namePrefix = sp.namePrefix + fmt.Sprintf("workload=%t", runWorkload)
if runOnline {
sp.namePrefix = "online"
} else {
sp.namePrefix = "offline"
sp.skip = "used for ad hoc experiments"
}

sp.initTestName()
r.Add(registry.TestSpec{
Name: sp.testName,
Owner: registry.OwnerDisasterRecovery,
Benchmark: true,
Cluster: sp.hardware.makeClusterSpecs(r, sp.backup.cloud),
Timeout: sp.timeout,
// These tests measure performance. To ensure consistent perf,
// disable metamorphic encryption.
EncryptionSupport: registry.EncryptionAlwaysDisabled,
CompatibleClouds: registry.Clouds(sp.backup.cloud),
Suites: sp.suites,
Skip: sp.skip,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
sp.initTestName()
r.Add(registry.TestSpec{
Name: sp.testName,
Owner: registry.OwnerDisasterRecovery,
Benchmark: true,
Cluster: sp.hardware.makeClusterSpecs(r, sp.backup.cloud),
Timeout: sp.timeout,
// These tests measure performance. To ensure consistent perf,
// disable metamorphic encryption.
EncryptionSupport: registry.EncryptionAlwaysDisabled,
CompatibleClouds: registry.Clouds(sp.backup.cloud),
Suites: sp.suites,
Skip: sp.skip,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {

testStartTime := timeutil.Now()
testStartTime := timeutil.Now()

rd := makeRestoreDriver(t, c, sp)
rd.prepareCluster(ctx)
rd := makeRestoreDriver(t, c, sp)
rd.prepareCluster(ctx)

m := c.NewMonitor(ctx, sp.hardware.getCRDBNodes())
m.Go(func(ctx context.Context) error {
db, err := rd.c.ConnE(ctx, rd.t.L(), rd.c.Node(1)[0])
if err != nil {
return err
}
defer db.Close()
if _, err := db.Exec("SET CLUSTER SETTING kv.queue.process.guaranteed_time_budget='1h'"); err != nil {
return err
}
if _, err := db.Exec("SET CLUSTER SETTING kv.snapshot_receiver.excise.enabled=true"); err != nil {
return err
}
if _, err := db.Exec("SET CLUSTER SETTING sql.stats.automatic_collection.enabled=false"); err != nil {
return err
}
opts := ""
if runOnline {
opts = "WITH EXPERIMENTAL DEFERRED COPY"
}
restoreCmd := rd.restoreCmd("DATABASE tpce", opts)
t.L().Printf("Running %s", restoreCmd)
if _, err = db.ExecContext(ctx, restoreCmd); err != nil {
return err
}
return nil
})
m.Wait()
m := c.NewMonitor(ctx, sp.hardware.getCRDBNodes())
m.Go(func(ctx context.Context) error {
db, err := rd.c.ConnE(ctx, rd.t.L(), rd.c.Node(1)[0])
if err != nil {
return err
}
defer db.Close()
if _, err := db.Exec("SET CLUSTER SETTING kv.queue.process.guaranteed_time_budget='1h'"); err != nil {
return err
}
if _, err := db.Exec("SET CLUSTER SETTING kv.snapshot_receiver.excise.enabled=true"); err != nil {
return err
}
if _, err := db.Exec("SET CLUSTER SETTING sql.stats.automatic_collection.enabled=false"); err != nil {
return err
}
opts := ""
if runOnline {
opts = "WITH EXPERIMENTAL DEFERRED COPY"
}
restoreCmd := rd.restoreCmd("DATABASE tpce", opts)
t.L().Printf("Running %s", restoreCmd)
if _, err = db.ExecContext(ctx, restoreCmd); err != nil {
return err
}
return nil
})
m.Wait()

workloadCtx, workloadCancel := context.WithCancel(ctx)
mDownload := c.NewMonitor(workloadCtx, sp.hardware.getCRDBNodes())
// TODO(msbutler): add foreground query latency tracker
workloadCtx, workloadCancel := context.WithCancel(ctx)
mDownload := c.NewMonitor(workloadCtx, sp.hardware.getCRDBNodes())
// TODO(msbutler): add foreground query latency tracker

mDownload.Go(func(ctx context.Context) error {
if !runWorkload {
fmt.Printf("roachtest configured to skip running the foreground workload")
return nil
}
err := sp.backup.workload.run(ctx, t, c, sp.hardware)
// We expect the workload to return a context cancelled error because
// the roachtest driver cancels the monitor's context after the download job completes
if err != nil && ctx.Err() == nil {
// Implies the workload context was not cancelled and the workload cmd returned a
// different error.
return errors.Wrapf(err, `Workload context was not cancelled. Error returned by workload cmd`)
}
rd.t.L().Printf("workload successfully finished")
return nil
})
mDownload.Go(func(ctx context.Context) error {
defer workloadCancel()
if runOnline {
return waitForDownloadJob(ctx, c, t.L())
}
if runWorkload {
// If we just completed an offline restore and are running the
// workload, run the workload until we're at most 15 minutes
// away from timing out.
testRuntime := timeutil.Since(testStartTime)
workloadDuration := sp.timeout - testRuntime
if workloadDuration > time.Minute*15 {
workloadDuration = workloadDuration - time.Minute*15
}
fmt.Printf("let workload run for %.2f minutes", workloadDuration.Minutes())
time.Sleep(workloadDuration)
}
return nil
})
mDownload.Wait()
},
})
}
mDownload.Go(func(ctx context.Context) error {
err := sp.backup.workload.run(ctx, t, c, sp.hardware)
// We expect the workload to return a context cancelled error because
// the roachtest driver cancels the monitor's context after the download job completes
if err != nil && ctx.Err() == nil {
// Implies the workload context was not cancelled and the workload cmd returned a
// different error.
return errors.Wrapf(err, `Workload context was not cancelled. Error returned by workload cmd`)
}
rd.t.L().Printf("workload successfully finished")
return nil
})
mDownload.Go(func(ctx context.Context) error {
defer workloadCancel()
if runOnline {
return waitForDownloadJob(ctx, c, t.L())
}
// If we just completed an offline restore and are running the
// workload, run the workload until we're at most 15 minutes
// away from timing out.
testRuntime := timeutil.Since(testStartTime)
workloadDuration := sp.timeout - testRuntime
if workloadDuration > time.Minute*15 {
workloadDuration = workloadDuration - time.Minute*15
}
fmt.Printf("let workload run for %.2f minutes", workloadDuration.Minutes())
time.Sleep(workloadDuration)
return nil
})
mDownload.Wait()
},
})
}
}
}
Expand Down
14 changes: 12 additions & 2 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2759,7 +2759,7 @@ func (ex *connExecutor) execCopyOut(
) (retEv fsm.Event, retPayload fsm.EventPayload) {
// First handle connExecutor state transitions.
if _, isNoTxn := ex.machine.CurState().(stateNoTxn); isNoTxn {
return ex.beginImplicitTxn(ctx, cmd.ParsedStmt.AST)
return ex.beginImplicitTxn(ctx, cmd.ParsedStmt.AST, ex.copyQualityOfService())
} else if _, isAbortedTxn := ex.machine.CurState().(stateAborted); isAbortedTxn {
return ex.makeErrEvent(sqlerrors.NewTransactionAbortedError("" /* customMsg */), cmd.ParsedStmt.AST)
}
Expand Down Expand Up @@ -2983,7 +2983,7 @@ func (ex *connExecutor) execCopyIn(
) (retEv fsm.Event, retPayload fsm.EventPayload) {
// First handle connExecutor state transitions.
if _, isNoTxn := ex.machine.CurState().(stateNoTxn); isNoTxn {
return ex.beginImplicitTxn(ctx, cmd.ParsedStmt.AST)
return ex.beginImplicitTxn(ctx, cmd.ParsedStmt.AST, ex.copyQualityOfService())
} else if _, isAbortedTxn := ex.machine.CurState().(stateAborted); isAbortedTxn {
return ex.makeErrEvent(sqlerrors.NewTransactionAbortedError("" /* customMsg */), cmd.ParsedStmt.AST)
}
Expand Down Expand Up @@ -3515,6 +3515,16 @@ func (ex *connExecutor) QualityOfService() sessiondatapb.QoSLevel {
return ex.sessionData().DefaultTxnQualityOfService
}

// copyQualityOfService returns the QoSLevel session setting for COPY if the
// session settings are populated, otherwise the background QoSLevel.
func (ex *connExecutor) copyQualityOfService() sessiondatapb.QoSLevel {
// TODO(yuzefovich): investigate whether we need this check here and above.
if ex.sessionData() == nil {
return sessiondatapb.UserLow
}
return ex.sessionData().CopyTxnQualityOfService
}

func (ex *connExecutor) readWriteModeWithSessionDefault(
mode tree.ReadWriteMode,
) tree.ReadWriteMode {
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -2542,7 +2542,7 @@ func (ex *connExecutor) execStmtInNoTxnState(
// an AOST clause. In these cases the clause is evaluated and applied
// when the command is executed again.
func (ex *connExecutor) beginImplicitTxn(
ctx context.Context, ast tree.Statement,
ctx context.Context, ast tree.Statement, qos sessiondatapb.QoSLevel,
) (fsm.Event, fsm.EventPayload) {
// NB: Implicit transactions are created with the session's default
// historical timestamp even though the statement itself might contain
Expand All @@ -2560,7 +2560,7 @@ func (ex *connExecutor) beginImplicitTxn(
sqlTs,
historicalTs,
ex.transitionCtx,
ex.QualityOfService(),
qos,
ex.txnIsolationLevelToKV(ctx, tree.UnspecifiedIsolation),
ex.omitInRangefeeds(),
)
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/conn_executor_prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (ex *connExecutor) execPrepare(
// information about the previous transaction. We expect to execute
// this command in NoTxn.
if _, ok := parseCmd.AST.(*tree.ShowCommitTimestamp); !ok {
return ex.beginImplicitTxn(ctx, parseCmd.AST)
return ex.beginImplicitTxn(ctx, parseCmd.AST, ex.QualityOfService())
}
} else if _, isAbortedTxn := ex.machine.CurState().(stateAborted); isAbortedTxn {
if !ex.isAllowedInAbortedTxn(parseCmd.AST) {
Expand Down Expand Up @@ -383,7 +383,7 @@ func (ex *connExecutor) execBind(
// executing SHOW COMMIT TIMESTAMP as it would destroy the information
// about the previously committed transaction.
if _, ok := ps.AST.(*tree.ShowCommitTimestamp); !ok {
return ex.beginImplicitTxn(ctx, ps.AST)
return ex.beginImplicitTxn(ctx, ps.AST, ex.QualityOfService())
}
} else if _, isAbortedTxn := ex.machine.CurState().(stateAborted); isAbortedTxn {
if !ex.isAllowedInAbortedTxn(ps.AST) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/copy/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_test(
deps = [
"//pkg/base",
"//pkg/cli/clisqlclient",
"//pkg/kv",
"//pkg/kv/kvpb",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/security/securityassets",
Expand All @@ -34,6 +35,7 @@ go_test(
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/admission/admissionpb",
"//pkg/util/ctxgroup",
"//pkg/util/encoding",
"//pkg/util/encoding/csv",
Expand Down
14 changes: 13 additions & 1 deletion pkg/sql/copy/copy_in_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/cli/clisqlclient"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/sql"
Expand All @@ -35,6 +36,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
Expand Down Expand Up @@ -502,7 +504,17 @@ func TestCopyFromRetries(t *testing.T) {
var params base.TestServerArgs
var attemptNumber int
params.Knobs.SQLExecutor = &sql.ExecutorTestingKnobs{
BeforeCopyFromInsert: func() error {
BeforeCopyFromInsert: func(txn *kv.Txn) error {
if !tc.inTxn {
// When we're not in an explicit txn, we expect that all
// txns used by the COPY use the background QoS.
if txn.AdmissionHeader().Priority != int32(admissionpb.UserLowPri) {
t.Errorf(
"unexpected QoS level %d (expected %d)",
txn.AdmissionHeader().Priority, admissionpb.UserLowPri,
)
}
}
attemptNumber++
return tc.hook(attemptNumber)
},
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/copy/copy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/apd/v3"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/cli/clisqlclient"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/sql"
Expand Down Expand Up @@ -569,7 +570,7 @@ func TestLargeDynamicRows(t *testing.T) {
var params base.TestServerArgs
var batchNumber int
params.Knobs.SQLExecutor = &sql.ExecutorTestingKnobs{
BeforeCopyFromInsert: func() error {
BeforeCopyFromInsert: func(*kv.Txn) error {
batchNumber++
return nil
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/copy_from.go
Original file line number Diff line number Diff line change
Expand Up @@ -1083,7 +1083,7 @@ func (c *copyMachine) insertRowsInternal(ctx context.Context, finalBatch bool) (
retErr = cleanup(ctx, retErr)
}()
if c.p.ExecCfg().TestingKnobs.BeforeCopyFromInsert != nil {
if err := c.p.ExecCfg().TestingKnobs.BeforeCopyFromInsert(); err != nil {
if err := c.p.ExecCfg().TestingKnobs.BeforeCopyFromInsert(c.txnOpt.txn); err != nil {
return err
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1652,7 +1652,7 @@ type ExecutorTestingKnobs struct {
UseTransactionalDescIDGenerator bool

// BeforeCopyFromInsert, if set, will be called during a COPY FROM insert statement.
BeforeCopyFromInsert func() error
BeforeCopyFromInsert func(txn *kv.Txn) error

// CopyFromInsertRetry, if set, will be called when a COPY FROM insert statement is retried.
CopyFromInsertRetry func() error
Expand Down
Loading

0 comments on commit 4a10b8f

Please sign in to comment.