From 94f1aacaed1c765e8dfe9f9fbe831a14980ce8aa Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Tue, 5 Dec 2023 17:29:52 -0800 Subject: [PATCH 1/5] sql: use background QoS for atomic COPY We recently merged a change to make COPY use "background" QoS by default. However, that change was incomplete - it only made it so that we use the "background" QoS only whenever a new txn is started by the copy state machine, and we forgot to make the corresponding update for the initial txn created outside of the state machine, if we're in an implicit txn. This is now fixed. Note that this initial txn is only used for "atomic" COPY because for non-atomic we always create a fresh txn before each batch. This commit also adjusts an existing test to verify that the expected QoS is used for all implicit txns. Release note: None --- pkg/sql/conn_executor.go | 14 ++++++++++++-- pkg/sql/conn_executor_exec.go | 4 ++-- pkg/sql/conn_executor_prepare.go | 4 ++-- pkg/sql/copy/BUILD.bazel | 2 ++ pkg/sql/copy/copy_in_test.go | 14 +++++++++++++- pkg/sql/copy/copy_test.go | 3 ++- pkg/sql/copy_from.go | 2 +- pkg/sql/exec_util.go | 2 +- 8 files changed, 35 insertions(+), 10 deletions(-) diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index c471f6a8301a..7d79e11e0139 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -2759,7 +2759,7 @@ func (ex *connExecutor) execCopyOut( ) (retEv fsm.Event, retPayload fsm.EventPayload) { // First handle connExecutor state transitions. if _, isNoTxn := ex.machine.CurState().(stateNoTxn); isNoTxn { - return ex.beginImplicitTxn(ctx, cmd.ParsedStmt.AST) + return ex.beginImplicitTxn(ctx, cmd.ParsedStmt.AST, ex.copyQualityOfService()) } else if _, isAbortedTxn := ex.machine.CurState().(stateAborted); isAbortedTxn { return ex.makeErrEvent(sqlerrors.NewTransactionAbortedError("" /* customMsg */), cmd.ParsedStmt.AST) } @@ -2983,7 +2983,7 @@ func (ex *connExecutor) execCopyIn( ) (retEv fsm.Event, retPayload fsm.EventPayload) { // First handle connExecutor state transitions. if _, isNoTxn := ex.machine.CurState().(stateNoTxn); isNoTxn { - return ex.beginImplicitTxn(ctx, cmd.ParsedStmt.AST) + return ex.beginImplicitTxn(ctx, cmd.ParsedStmt.AST, ex.copyQualityOfService()) } else if _, isAbortedTxn := ex.machine.CurState().(stateAborted); isAbortedTxn { return ex.makeErrEvent(sqlerrors.NewTransactionAbortedError("" /* customMsg */), cmd.ParsedStmt.AST) } @@ -3515,6 +3515,16 @@ func (ex *connExecutor) QualityOfService() sessiondatapb.QoSLevel { return ex.sessionData().DefaultTxnQualityOfService } +// copyQualityOfService returns the QoSLevel session setting for COPY if the +// session settings are populated, otherwise the background QoSLevel. +func (ex *connExecutor) copyQualityOfService() sessiondatapb.QoSLevel { + // TODO(yuzefovich): investigate whether we need this check here and above. + if ex.sessionData() == nil { + return sessiondatapb.UserLow + } + return ex.sessionData().CopyTxnQualityOfService +} + func (ex *connExecutor) readWriteModeWithSessionDefault( mode tree.ReadWriteMode, ) tree.ReadWriteMode { diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 2bbc957dfcac..67cd97c5d344 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -2532,7 +2532,7 @@ func (ex *connExecutor) execStmtInNoTxnState( // an AOST clause. In these cases the clause is evaluated and applied // when the command is executed again. func (ex *connExecutor) beginImplicitTxn( - ctx context.Context, ast tree.Statement, + ctx context.Context, ast tree.Statement, qos sessiondatapb.QoSLevel, ) (fsm.Event, fsm.EventPayload) { // NB: Implicit transactions are created with the session's default // historical timestamp even though the statement itself might contain @@ -2550,7 +2550,7 @@ func (ex *connExecutor) beginImplicitTxn( sqlTs, historicalTs, ex.transitionCtx, - ex.QualityOfService(), + qos, ex.txnIsolationLevelToKV(ctx, tree.UnspecifiedIsolation), ) } diff --git a/pkg/sql/conn_executor_prepare.go b/pkg/sql/conn_executor_prepare.go index ea397ce137bf..837d2e1942f5 100644 --- a/pkg/sql/conn_executor_prepare.go +++ b/pkg/sql/conn_executor_prepare.go @@ -47,7 +47,7 @@ func (ex *connExecutor) execPrepare( // information about the previous transaction. We expect to execute // this command in NoTxn. if _, ok := parseCmd.AST.(*tree.ShowCommitTimestamp); !ok { - return ex.beginImplicitTxn(ctx, parseCmd.AST) + return ex.beginImplicitTxn(ctx, parseCmd.AST, ex.QualityOfService()) } } else if _, isAbortedTxn := ex.machine.CurState().(stateAborted); isAbortedTxn { if !ex.isAllowedInAbortedTxn(parseCmd.AST) { @@ -382,7 +382,7 @@ func (ex *connExecutor) execBind( // executing SHOW COMMIT TIMESTAMP as it would destroy the information // about the previously committed transaction. if _, ok := ps.AST.(*tree.ShowCommitTimestamp); !ok { - return ex.beginImplicitTxn(ctx, ps.AST) + return ex.beginImplicitTxn(ctx, ps.AST, ex.QualityOfService()) } } else if _, isAbortedTxn := ex.machine.CurState().(stateAborted); isAbortedTxn { if !ex.isAllowedInAbortedTxn(ps.AST) { diff --git a/pkg/sql/copy/BUILD.bazel b/pkg/sql/copy/BUILD.bazel index 7b5a23f73f18..c242235a6ef8 100644 --- a/pkg/sql/copy/BUILD.bazel +++ b/pkg/sql/copy/BUILD.bazel @@ -14,6 +14,7 @@ go_test( deps = [ "//pkg/base", "//pkg/cli/clisqlclient", + "//pkg/kv", "//pkg/kv/kvpb", "//pkg/kv/kvserver/kvserverbase", "//pkg/security/securityassets", @@ -34,6 +35,7 @@ go_test( "//pkg/testutils/skip", "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", + "//pkg/util/admission/admissionpb", "//pkg/util/ctxgroup", "//pkg/util/encoding", "//pkg/util/encoding/csv", diff --git a/pkg/sql/copy/copy_in_test.go b/pkg/sql/copy/copy_in_test.go index 18ce59903f87..b4f8d016af12 100644 --- a/pkg/sql/copy/copy_in_test.go +++ b/pkg/sql/copy/copy_in_test.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/cli/clisqlclient" + "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/sql" @@ -35,6 +36,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/randutil" @@ -502,7 +504,17 @@ func TestCopyFromRetries(t *testing.T) { var params base.TestServerArgs var attemptNumber int params.Knobs.SQLExecutor = &sql.ExecutorTestingKnobs{ - BeforeCopyFromInsert: func() error { + BeforeCopyFromInsert: func(txn *kv.Txn) error { + if !tc.inTxn { + // When we're not in an explicit txn, we expect that all + // txns used by the COPY use the background QoS. + if txn.AdmissionHeader().Priority != int32(admissionpb.UserLowPri) { + t.Errorf( + "unexpected QoS level %d (expected %d)", + txn.AdmissionHeader().Priority, admissionpb.UserLowPri, + ) + } + } attemptNumber++ return tc.hook(attemptNumber) }, diff --git a/pkg/sql/copy/copy_test.go b/pkg/sql/copy/copy_test.go index 16b77b1ce3d8..f2168088bfe6 100644 --- a/pkg/sql/copy/copy_test.go +++ b/pkg/sql/copy/copy_test.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/apd/v3" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/cli/clisqlclient" + "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/sql" @@ -569,7 +570,7 @@ func TestLargeDynamicRows(t *testing.T) { var params base.TestServerArgs var batchNumber int params.Knobs.SQLExecutor = &sql.ExecutorTestingKnobs{ - BeforeCopyFromInsert: func() error { + BeforeCopyFromInsert: func(*kv.Txn) error { batchNumber++ return nil }, diff --git a/pkg/sql/copy_from.go b/pkg/sql/copy_from.go index f4ed3c0fbff3..f04f750a4701 100644 --- a/pkg/sql/copy_from.go +++ b/pkg/sql/copy_from.go @@ -1083,7 +1083,7 @@ func (c *copyMachine) insertRowsInternal(ctx context.Context, finalBatch bool) ( retErr = cleanup(ctx, retErr) }() if c.p.ExecCfg().TestingKnobs.BeforeCopyFromInsert != nil { - if err := c.p.ExecCfg().TestingKnobs.BeforeCopyFromInsert(); err != nil { + if err := c.p.ExecCfg().TestingKnobs.BeforeCopyFromInsert(c.txnOpt.txn); err != nil { return err } } diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 586396f5dcf4..b04932184a22 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -1652,7 +1652,7 @@ type ExecutorTestingKnobs struct { UseTransactionalDescIDGenerator bool // BeforeCopyFromInsert, if set, will be called during a COPY FROM insert statement. - BeforeCopyFromInsert func() error + BeforeCopyFromInsert func(txn *kv.Txn) error // CopyFromInsertRetry, if set, will be called when a COPY FROM insert statement is retried. CopyFromInsertRetry func() error From 044d04f2294ceaae3c464dfbfc46c9add5932f16 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Wed, 6 Dec 2023 13:39:18 -0800 Subject: [PATCH 2/5] importer: support arrays of UDT in some cases This commit adjusts the import code to support IMPORT INTO tables with columns typed as arrays of UDTs. This required a couple of minor changes: - propagating the `SemaCtx` into `ParseDatumStringAs`. Previously, we would always create a fresh one, but now all callers of this method have been adjusted to provide the one they have in scope. This allows us to parse expressions of the form `'Monday'::db.sc.enum_name` where `'Monday'` is a member of an enum. - implement `importTypeResolver.ResolveType` in the "happy" case. This is used to resolve casts from above to a concrete type. Note that `ResolveType` implementation is incomplete in the general case. In particular, whenever an import job is created, we _might_ have a set of types available (it appears that this is the case when we're importing into one table - for example, we have logic for importing the whole pgdump, there we can have multiple tables, and set of types won't be available). Whenever we do have a set of types, we can simplify the type resolution to simply match on the name of the type. However, if we happen to have a table that uses two UDTs with the same name but different schemas, this simplistic resolution won't work, so we still return an error in this case. Release note (sql change): CockroachDB now supports IMPORT INTO a table that has columns typed as arrays of user-defined types (like enums). Tables that uses multiple user-defined types with the same name but different schemas are still unsupported. --- pkg/cli/statement_bundle.go | 2 +- pkg/sql/importer/BUILD.bazel | 1 + pkg/sql/importer/import_into_test.go | 58 +++++++++++++++++++ pkg/sql/importer/import_type_resolver.go | 53 +++++++++++++---- pkg/sql/importer/read_import_avro.go | 17 ++++-- pkg/sql/importer/read_import_base.go | 5 +- pkg/sql/importer/read_import_csv.go | 2 +- pkg/sql/importer/read_import_mysql.go | 24 +++++--- pkg/sql/importer/read_import_mysql_test.go | 2 +- pkg/sql/importer/read_import_pgcopy.go | 2 +- pkg/sql/importer/read_import_pgdump.go | 6 +- pkg/sql/importer/read_import_workload.go | 12 ++-- pkg/sql/opt/testutils/testcat/test_catalog.go | 2 +- pkg/sql/row/row_converter.go | 12 ++-- pkg/sql/rowenc/roundtrip_format.go | 19 +++--- pkg/sql/rowenc/roundtrip_format_test.go | 6 +- pkg/sql/stats/json.go | 2 +- 17 files changed, 170 insertions(+), 55 deletions(-) diff --git a/pkg/cli/statement_bundle.go b/pkg/cli/statement_bundle.go index cd6ad77094f6..3ce4cfa4ecd7 100644 --- a/pkg/cli/statement_bundle.go +++ b/pkg/cli/statement_bundle.go @@ -329,7 +329,7 @@ func getExplainCombinations( } upperBound := bucket["upper_bound"].(string) bucketMap[key] = []string{upperBound} - datum, err := rowenc.ParseDatumStringAs(ctx, colType, upperBound, &evalCtx) + datum, err := rowenc.ParseDatumStringAs(ctx, colType, upperBound, &evalCtx, nil /* semaCtx */) if err != nil { panic("failed parsing datum string as " + datum.String() + " " + err.Error()) } diff --git a/pkg/sql/importer/BUILD.bazel b/pkg/sql/importer/BUILD.bazel index a8026d79f400..8c814204e42f 100644 --- a/pkg/sql/importer/BUILD.bazel +++ b/pkg/sql/importer/BUILD.bazel @@ -94,6 +94,7 @@ go_library( "//pkg/sql/sem/tree", "//pkg/sql/sessiondata", "//pkg/sql/sqlclustersettings", + "//pkg/sql/sqlerrors", "//pkg/sql/sqltelemetry", "//pkg/sql/stats", "//pkg/sql/types", diff --git a/pkg/sql/importer/import_into_test.go b/pkg/sql/importer/import_into_test.go index 16ea3547b8ab..3c77fa955508 100644 --- a/pkg/sql/importer/import_into_test.go +++ b/pkg/sql/importer/import_into_test.go @@ -185,3 +185,61 @@ func getFirstStoreReplica( }) return store, repl } + +// TestImportIntoWithUDTArray verifies that we can support importing data into a +// table with a column typed as an array of user-defined types. +func TestImportIntoWithUDTArray(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + dir, dirCleanupFn := testutils.TempDir(t) + defer dirCleanupFn() + + ctx := context.Background() + srv, db, _ := serverutils.StartServer(t, base.TestServerArgs{ + ExternalIODir: dir, + }) + defer srv.Stopper().Stop(ctx) + + runner := sqlutils.MakeSQLRunner(db) + runner.Exec(t, ` +CREATE TYPE weekday AS ENUM('Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday'); +CREATE TABLE shifts (employee STRING, days weekday[]); +INSERT INTO shifts VALUES ('John', ARRAY['Monday', 'Wednesday', 'Friday']); +INSERT INTO shifts VALUES ('Bob', ARRAY['Tuesday', 'Thursday']); +`) + // Sanity check that we currently have the expected state. + expected := [][]string{ + {"John", "{Monday,Wednesday,Friday}"}, + {"Bob", "{Tuesday,Thursday}"}, + } + runner.CheckQueryResults(t, "SELECT * FROM shifts;", expected) + // Export has to run in a separate implicit txn. + runner.Exec(t, `EXPORT INTO CSV 'nodelocal://1/export1/' FROM SELECT * FROM shifts;`) + // Now clear the table since we'll be importing into it. + runner.Exec(t, `DELETE FROM shifts WHERE true;`) + runner.CheckQueryResults(t, "SELECT count(*) FROM shifts;", [][]string{{"0"}}) + // Import two rows once. + runner.Exec(t, "IMPORT INTO shifts CSV DATA ('nodelocal://1/export1/export*-n*.0.csv');") + runner.CheckQueryResults(t, "SELECT * FROM shifts;", expected) + // Import two rows again - we'll now have four rows in the table. + runner.Exec(t, "IMPORT INTO shifts CSV DATA ('nodelocal://1/export1/export*-n*.0.csv');") + runner.CheckQueryResults(t, "SELECT * FROM shifts;", append(expected, expected...)) + + // We currently don't support importing into a table that has columns with + // UDTs with the same name but different schemas. + runner.Exec(t, ` +CREATE SCHEMA short; +CREATE TYPE short.weekday AS ENUM('M', 'Tu', 'W', 'Th', 'F'); +DROP TABLE shifts; +CREATE TABLE shifts (employee STRING, days weekday[], days_short short.weekday[]); +INSERT INTO shifts VALUES ('John', ARRAY['Monday', 'Wednesday', 'Friday'], ARRAY['M', 'W', 'F']); +INSERT INTO shifts VALUES ('Bob', ARRAY['Tuesday', 'Thursday'], ARRAY['Tu', 'Th']); +`) + runner.Exec(t, `EXPORT INTO CSV 'nodelocal://1/export2/' FROM SELECT * FROM shifts;`) + runner.ExpectErr( + t, + ".*tables with multiple user-defined types with the same name are currently unsupported.*", + "IMPORT INTO shifts CSV DATA ('nodelocal://1/export2/export*-n*.0.csv');", + ) +} diff --git a/pkg/sql/importer/import_type_resolver.go b/pkg/sql/importer/import_type_resolver.go index 3fff08e3b1de..7d9f21aa477a 100644 --- a/pkg/sql/importer/import_type_resolver.go +++ b/pkg/sql/importer/import_type_resolver.go @@ -16,7 +16,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sqlerrors" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/errors" "github.com/lib/pq/oid" @@ -24,36 +27,64 @@ import ( type importTypeResolver struct { typeIDToDesc map[descpb.ID]*descpb.TypeDescriptor - typeNameToDesc map[string]*descpb.TypeDescriptor + typeNameToDesc map[string][]*descpb.TypeDescriptor } -func newImportTypeResolver(typeDescs []*descpb.TypeDescriptor) importTypeResolver { +var _ tree.TypeReferenceResolver = importTypeResolver{} +var _ catalog.TypeDescriptorResolver = importTypeResolver{} + +func makeImportTypeResolver(typeDescs []*descpb.TypeDescriptor) importTypeResolver { itr := importTypeResolver{ typeIDToDesc: make(map[descpb.ID]*descpb.TypeDescriptor), - typeNameToDesc: make(map[string]*descpb.TypeDescriptor), + typeNameToDesc: make(map[string][]*descpb.TypeDescriptor), } for _, typeDesc := range typeDescs { itr.typeIDToDesc[typeDesc.GetID()] = typeDesc - itr.typeNameToDesc[typeDesc.GetName()] = typeDesc + name := typeDesc.GetName() + itr.typeNameToDesc[name] = append(itr.typeNameToDesc[name], typeDesc) } return itr } -var _ tree.TypeReferenceResolver = &importTypeResolver{} - +// ResolveType implements the tree.TypeReferenceResolver interface. +// +// We currently have an incomplete implementation of this method - namely, it +// works whenever typeDescs are provided in makeImportTypeResolver (which is the +// case when we're importing into exactly one table). In such a case, the type +// resolution can be simplified to only look up into the provided types based on +// the type's name (meaning we can avoid resolving the db and the schema names). +// +// Note that if a table happens to have multiple types with the same name (but +// different schemas), this implementation will return a "feature unsupported" +// error. func (i importTypeResolver) ResolveType( - _ context.Context, _ *tree.UnresolvedObjectName, + ctx context.Context, name *tree.UnresolvedObjectName, ) (*types.T, error) { - return nil, errors.New("importTypeResolver does not implement ResolveType") + var descs []*descpb.TypeDescriptor + var ok bool + if descs, ok = i.typeNameToDesc[name.Parts[0]]; !ok || len(descs) == 0 { + return nil, sqlerrors.NewUndefinedTypeError(name) + } + if len(descs) > 1 { + return nil, pgerror.New( + pgcode.FeatureNotSupported, + "tables with multiple user-defined types with the same name are currently unsupported", + ) + } + typeDesc := typedesc.NewBuilder(descs[0]).BuildImmutableType() + t := typeDesc.AsTypesT() + if err := typedesc.EnsureTypeIsHydrated(ctx, t, i); err != nil { + return nil, err + } + return t, nil } +// ResolveTypeByOID implements the tree.TypeReferenceResolver interface. func (i importTypeResolver) ResolveTypeByOID(ctx context.Context, oid oid.Oid) (*types.T, error) { return typedesc.ResolveHydratedTByOID(ctx, oid, i) } -var _ catalog.TypeDescriptorResolver = &importTypeResolver{} - -// GetTypeDescriptor implements the sqlbase.TypeDescriptorResolver interface. +// GetTypeDescriptor implements the catalog.TypeDescriptorResolver interface. func (i importTypeResolver) GetTypeDescriptor( _ context.Context, id descpb.ID, ) (tree.TypeName, catalog.TypeDescriptor, error) { diff --git a/pkg/sql/importer/read_import_avro.go b/pkg/sql/importer/read_import_avro.go index 2cf373ccdad4..ca050421ee31 100644 --- a/pkg/sql/importer/read_import_avro.go +++ b/pkg/sql/importer/read_import_avro.go @@ -73,7 +73,12 @@ func nativeTimeToDatum(t time.Time, targetT *types.T) (tree.Datum, error) { // the key is a primitive or logical Avro type name ("string", // "long.time-millis", etc). func nativeToDatum( - ctx context.Context, x interface{}, targetT *types.T, avroT []string, evalCtx *eval.Context, + ctx context.Context, + x interface{}, + targetT *types.T, + avroT []string, + evalCtx *eval.Context, + semaCtx *tree.SemaContext, ) (tree.Datum, error) { var d tree.Datum @@ -111,19 +116,19 @@ func nativeToDatum( // []byte arrays are hard. Sometimes we want []bytes, sometimes // we want StringFamily. So, instead of creating DBytes datum, // parse this data to "cast" it to our expected type. - return rowenc.ParseDatumStringAs(ctx, targetT, string(v), evalCtx) + return rowenc.ParseDatumStringAs(ctx, targetT, string(v), evalCtx, semaCtx) } case string: // We allow strings to be specified for any column, as // long as we can convert the string value to the target type. - return rowenc.ParseDatumStringAs(ctx, targetT, v, evalCtx) + return rowenc.ParseDatumStringAs(ctx, targetT, v, evalCtx, semaCtx) case map[string]interface{}: for _, aT := range avroT { // The value passed in is an avro schema. Extract // possible primitive types from the dictionary and // attempt to convert those values to our target type. if val, ok := v[aT]; ok { - return nativeToDatum(ctx, val, targetT, avroT, evalCtx) + return nativeToDatum(ctx, val, targetT, avroT, evalCtx, semaCtx) } } case []interface{}: @@ -139,7 +144,7 @@ func nativeToDatum( // Convert each element. arr := tree.NewDArray(targetT.ArrayContents()) for _, elt := range v { - eltDatum, err := nativeToDatum(ctx, elt, targetT.ArrayContents(), eltAvroT, evalCtx) + eltDatum, err := nativeToDatum(ctx, elt, targetT.ArrayContents(), eltAvroT, evalCtx, semaCtx) if err == nil { err = arr.Append(eltDatum) } @@ -230,7 +235,7 @@ func (a *avroConsumer) convertNative( if !ok { return fmt.Errorf("cannot convert avro value %v to col %s", v, conv.VisibleCols[idx].GetType().Name()) } - datum, err := nativeToDatum(ctx, v, typ, avroT, conv.EvalCtx) + datum, err := nativeToDatum(ctx, v, typ, avroT, conv.EvalCtx, conv.SemaCtx) if err != nil { return err } diff --git a/pkg/sql/importer/read_import_base.go b/pkg/sql/importer/read_import_base.go index 331fd2fd9f99..206a6030b516 100644 --- a/pkg/sql/importer/read_import_base.go +++ b/pkg/sql/importer/read_import_base.go @@ -60,7 +60,7 @@ func runImport( // Install type metadata in all of the import tables. spec = protoutil.Clone(spec).(*execinfrapb.ReadImportDataSpec) - importResolver := newImportTypeResolver(spec.Types) + importResolver := makeImportTypeResolver(spec.Types) for _, table := range spec.Tables { cpy := tabledesc.NewBuilder(table.Desc).BuildCreatedMutableTable() if err := typedesc.HydrateTypesInDescriptor(ctx, cpy, importResolver); err != nil { @@ -493,7 +493,8 @@ func makeDatumConverter( ) (*row.DatumRowConverter, error) { conv, err := row.NewDatumRowConverter( ctx, importCtx.semaCtx, importCtx.tableDesc, importCtx.targetCols, importCtx.evalCtx, - importCtx.kvCh, importCtx.seqChunkProvider, nil /* metrics */, db) + importCtx.kvCh, importCtx.seqChunkProvider, nil /* metrics */, db, + ) if err == nil { conv.KvBatch.Source = fileCtx.source } diff --git a/pkg/sql/importer/read_import_csv.go b/pkg/sql/importer/read_import_csv.go index b09b891fe645..575a4ce109b2 100644 --- a/pkg/sql/importer/read_import_csv.go +++ b/pkg/sql/importer/read_import_csv.go @@ -219,7 +219,7 @@ func (c *csvRowConsumer) FillDatums( conv.Datums[datumIdx] = tree.DNull } else { var err error - conv.Datums[datumIdx], err = rowenc.ParseDatumStringAs(ctx, conv.VisibleColTypes[i], field.Val, conv.EvalCtx) + conv.Datums[datumIdx], err = rowenc.ParseDatumStringAs(ctx, conv.VisibleColTypes[i], field.Val, conv.EvalCtx, conv.SemaCtx) if err != nil { // Fallback to parsing as a string literal. This allows us to support // both array expressions (like `ARRAY[1, 2, 3]`) and literals (like diff --git a/pkg/sql/importer/read_import_mysql.go b/pkg/sql/importer/read_import_mysql.go index 887f25c8aa35..f244e8f10224 100644 --- a/pkg/sql/importer/read_import_mysql.go +++ b/pkg/sql/importer/read_import_mysql.go @@ -82,9 +82,11 @@ func newMysqldumpReader( converters[name] = nil continue } - conv, err := row.NewDatumRowConverter(ctx, semaCtx, tabledesc.NewBuilder(table.Desc). - BuildImmutableTable(), nil /* targetColNames */, evalCtx, kvCh, - nil /* seqChunkProvider */, nil /* metrics */, db) + conv, err := row.NewDatumRowConverter( + ctx, semaCtx, tabledesc.NewBuilder(table.Desc).BuildImmutableTable(), + nil /* targetColNames */, evalCtx, kvCh, + nil /* seqChunkProvider */, nil /* metrics */, db, + ) if err != nil { return nil, err } @@ -171,7 +173,7 @@ func (m *mysqldumpReader) readFile( return errors.Errorf("expected %d values, got %d: %v", expected, got, inputRow) } for i, raw := range inputRow { - converted, err := mysqlValueToDatum(ctx, raw, conv.VisibleColTypes[i], conv.EvalCtx) + converted, err := mysqlValueToDatum(ctx, raw, conv.VisibleColTypes[i], conv.EvalCtx, conv.SemaCtx) if err != nil { return errors.Wrapf(err, "reading row %d (%d in insert statement %d)", count, count-startingCount, inserts) @@ -227,7 +229,11 @@ func mysqlStrToDatum(evalCtx *eval.Context, s string, desired *types.T) (tree.Da // wrapper types are: StrVal, IntVal, FloatVal, HexNum, HexVal, ValArg, BitVal // as well as NullVal. func mysqlValueToDatum( - ctx context.Context, raw mysql.Expr, desired *types.T, evalContext *eval.Context, + ctx context.Context, + raw mysql.Expr, + desired *types.T, + evalContext *eval.Context, + semaCtx *tree.SemaContext, ) (tree.Datum, error) { switch v := raw.(type) { case mysql.BoolVal: @@ -255,9 +261,9 @@ func mysqlValueToDatum( } return mysqlStrToDatum(evalContext, s, desired) case mysql.IntVal: - return rowenc.ParseDatumStringAs(ctx, desired, string(v.Val), evalContext) + return rowenc.ParseDatumStringAs(ctx, desired, string(v.Val), evalContext, semaCtx) case mysql.FloatVal: - return rowenc.ParseDatumStringAs(ctx, desired, string(v.Val), evalContext) + return rowenc.ParseDatumStringAs(ctx, desired, string(v.Val), evalContext, semaCtx) case mysql.HexVal: v, err := v.HexDecode() return tree.NewDBytes(tree.DBytes(v)), err @@ -270,7 +276,7 @@ func mysqlValueToDatum( case *mysql.UnaryExpr: switch v.Operator { case mysql.UMinusOp: - parsed, err := mysqlValueToDatum(ctx, v.Expr, desired, evalContext) + parsed, err := mysqlValueToDatum(ctx, v.Expr, desired, evalContext, semaCtx) if err != nil { return nil, err } @@ -289,7 +295,7 @@ func mysqlValueToDatum( } case mysql.UBinaryOp: // TODO(dt): do we want to use this hint to change our decoding logic? - return mysqlValueToDatum(ctx, v.Expr, desired, evalContext) + return mysqlValueToDatum(ctx, v.Expr, desired, evalContext, semaCtx) default: return nil, errors.Errorf("unexpected operator: %q", v.Operator) } diff --git a/pkg/sql/importer/read_import_mysql_test.go b/pkg/sql/importer/read_import_mysql_test.go index eb89c4628170..a8897e2979b9 100644 --- a/pkg/sql/importer/read_import_mysql_test.go +++ b/pkg/sql/importer/read_import_mysql_test.go @@ -374,7 +374,7 @@ func TestMysqlValueToDatum(t *testing.T) { evalContext := eval.NewTestingEvalContext(st) for _, tc := range tests { t.Run(fmt.Sprintf("%v", tc.raw), func(t *testing.T) { - got, err := mysqlValueToDatum(context.Background(), tc.raw, tc.typ, evalContext) + got, err := mysqlValueToDatum(context.Background(), tc.raw, tc.typ, evalContext, nil /* semaCtx */) if err != nil { t.Fatal(err) } diff --git a/pkg/sql/importer/read_import_pgcopy.go b/pkg/sql/importer/read_import_pgcopy.go index f6b43f11ed51..ff4fc1370d9f 100644 --- a/pkg/sql/importer/read_import_pgcopy.go +++ b/pkg/sql/importer/read_import_pgcopy.go @@ -336,7 +336,7 @@ func (p *pgCopyConsumer) FillDatums( if s == nil { conv.Datums[i] = tree.DNull } else { - conv.Datums[i], err = rowenc.ParseDatumStringAs(ctx, conv.VisibleColTypes[i], *s, conv.EvalCtx) + conv.Datums[i], err = rowenc.ParseDatumStringAs(ctx, conv.VisibleColTypes[i], *s, conv.EvalCtx, conv.SemaCtx) if err != nil { col := conv.VisibleCols[i] return newImportRowError(errors.Wrapf( diff --git a/pkg/sql/importer/read_import_pgdump.go b/pkg/sql/importer/read_import_pgdump.go index c051b8612370..073e68991107 100644 --- a/pkg/sql/importer/read_import_pgdump.go +++ b/pkg/sql/importer/read_import_pgdump.go @@ -999,8 +999,10 @@ func newPgDumpReader( for i, col := range tableDesc.VisibleColumns() { colSubMap[col.GetName()] = i } - conv, err := row.NewDatumRowConverter(ctx, semaCtx, tableDesc, targetCols, evalCtx, kvCh, - nil /* seqChunkProvider */, nil /* metrics */, db) + conv, err := row.NewDatumRowConverter( + ctx, semaCtx, tableDesc, targetCols, evalCtx, kvCh, + nil /* seqChunkProvider */, nil /* metrics */, db, + ) if err != nil { return nil, err } diff --git a/pkg/sql/importer/read_import_workload.go b/pkg/sql/importer/read_import_workload.go index f9610fe22520..e7b03634dcbc 100644 --- a/pkg/sql/importer/read_import_workload.go +++ b/pkg/sql/importer/read_import_workload.go @@ -69,6 +69,7 @@ func makeDatumFromColOffset( alloc *tree.DatumAlloc, hint *types.T, evalCtx *eval.Context, + semaCtx *tree.SemaContext, col coldata.Vec, rowIdx int, ) (tree.Datum, error) { @@ -122,7 +123,7 @@ func makeDatumFromColOffset( default: data := col.Bytes().Get(rowIdx) str := *(*string)(unsafe.Pointer(&data)) - return rowenc.ParseDatumStringAs(ctx, hint, str, evalCtx) + return rowenc.ParseDatumStringAs(ctx, hint, str, evalCtx, semaCtx) } } return nil, errors.Errorf( @@ -238,8 +239,10 @@ func NewWorkloadKVConverter( func (w *WorkloadKVConverter) Worker( ctx context.Context, evalCtx *eval.Context, semaCtx *tree.SemaContext, ) error { - conv, err := row.NewDatumRowConverter(ctx, semaCtx, w.tableDesc, nil, /* targetColNames */ - evalCtx, w.kvCh, nil /* seqChunkProvider */, nil /* metrics */, w.db) + conv, err := row.NewDatumRowConverter( + ctx, semaCtx, w.tableDesc, nil, /* targetColNames */ + evalCtx, w.kvCh, nil /* seqChunkProvider */, nil /* metrics */, w.db, + ) if err != nil { return err } @@ -263,7 +266,8 @@ func (w *WorkloadKVConverter) Worker( // TODO(dan): This does a type switch once per-datum. Reduce this to // a one-time switch per column. converted, err := makeDatumFromColOffset( - ctx, &alloc, conv.VisibleColTypes[colIdx], evalCtx, col, rowIdx) + ctx, &alloc, conv.VisibleColTypes[colIdx], conv.EvalCtx, conv.SemaCtx, col, rowIdx, + ) if err != nil { return err } diff --git a/pkg/sql/opt/testutils/testcat/test_catalog.go b/pkg/sql/opt/testutils/testcat/test_catalog.go index 06f1028a1bac..a3fba04f812a 100644 --- a/pkg/sql/opt/testutils/testcat/test_catalog.go +++ b/pkg/sql/opt/testutils/testcat/test_catalog.go @@ -1339,7 +1339,7 @@ func (ts *TableStat) Histogram() []cat.HistogramBucket { for i := offset; i < len(ts.histogram); i++ { bucket := &ts.js.HistogramBuckets[i-offset] - datum, err := rowenc.ParseDatumStringAs(context.Background(), colType, bucket.UpperBound, evalCtx) + datum, err := rowenc.ParseDatumStringAs(context.Background(), colType, bucket.UpperBound, evalCtx, nil /* semaCtx */) if err != nil { panic(err) } diff --git a/pkg/sql/row/row_converter.go b/pkg/sql/row/row_converter.go index ea5757d199c9..08302466b689 100644 --- a/pkg/sql/row/row_converter.go +++ b/pkg/sql/row/row_converter.go @@ -218,6 +218,7 @@ type DatumRowConverter struct { // The rest of these are derived from tableDesc, just cached here. ri Inserter EvalCtx *eval.Context + SemaCtx *tree.SemaContext cols []catalog.Column VisibleCols []catalog.Column VisibleColTypes []*types.T @@ -352,8 +353,9 @@ func NewDatumRowConverter( // We take a copy of the baseSemaCtx since this method is called by the parallel // import workers. semaCtxCopy := *baseSemaCtx + c.SemaCtx = &semaCtxCopy cols := schemaexpr.ProcessColumnSet(targetCols, tableDesc, relevantColumns) - defaultExprs, err := schemaexpr.MakeDefaultExprs(ctx, cols, &txCtx, c.EvalCtx, &semaCtxCopy) + defaultExprs, err := schemaexpr.MakeDefaultExprs(ctx, cols, &txCtx, c.EvalCtx, c.SemaCtx) if err != nil { return nil, errors.Wrap(err, "process default and computed columns") } @@ -463,7 +465,8 @@ func NewDatumRowConverter( c.tableDesc, tree.NewUnqualifiedTableName(tree.Name(c.tableDesc.GetName())), c.EvalCtx, - &semaCtxCopy) + c.SemaCtx, + ) if err != nil { return nil, errors.Wrapf(err, "error type checking and building computed expression for IMPORT INTO") } @@ -471,8 +474,9 @@ func NewDatumRowConverter( // Here, partialIndexExprs will be nil if there are no partial indexes, or a // map of predicate expressions for each partial index in the input list of // indexes. - c.partialIndexExprs, _, err = schemaexpr.MakePartialIndexExprs(ctx, c.tableDesc.PartialIndexes(), - c.tableDesc.PublicColumns(), c.tableDesc, c.EvalCtx, &semaCtxCopy) + c.partialIndexExprs, _, err = schemaexpr.MakePartialIndexExprs( + ctx, c.tableDesc.PartialIndexes(), c.tableDesc.PublicColumns(), c.tableDesc, c.EvalCtx, c.SemaCtx, + ) if err != nil { return nil, errors.Wrapf(err, "error type checking and building partial index expression for IMPORT INTO") } diff --git a/pkg/sql/rowenc/roundtrip_format.go b/pkg/sql/rowenc/roundtrip_format.go index e0369a9b2239..002b18690b03 100644 --- a/pkg/sql/rowenc/roundtrip_format.go +++ b/pkg/sql/rowenc/roundtrip_format.go @@ -20,15 +20,19 @@ import ( ) // ParseDatumStringAs parses s as type t. This function is guaranteed to -// round-trip when printing a Datum with FmtExport. +// round-trip when printing a Datum with FmtExport. semaCtx is optional. func ParseDatumStringAs( - ctx context.Context, t *types.T, s string, evalCtx *eval.Context, + ctx context.Context, t *types.T, s string, evalCtx *eval.Context, semaCtx *tree.SemaContext, ) (tree.Datum, error) { switch t.Family() { - // We use a different parser for array types because ParseAndRequireString only parses - // the internal postgres string representation of arrays. + // We use a different parser for array types because ParseAndRequireString + // only parses the internal postgres string representation of arrays. case types.ArrayFamily, types.CollatedStringFamily: - return parseAsTyp(ctx, evalCtx, t, s) + if semaCtx == nil { + sema := tree.MakeSemaContext() + semaCtx = &sema + } + return parseAsTyp(ctx, evalCtx, semaCtx, t, s) default: res, _, err := tree.ParseAndRequireString(t, s, evalCtx) return res, err @@ -36,14 +40,13 @@ func ParseDatumStringAs( } func parseAsTyp( - ctx context.Context, evalCtx *eval.Context, typ *types.T, s string, + ctx context.Context, evalCtx *eval.Context, semaCtx *tree.SemaContext, typ *types.T, s string, ) (tree.Datum, error) { expr, err := parser.ParseExpr(s) if err != nil { return nil, err } - semaCtx := tree.MakeSemaContext() - typedExpr, err := tree.TypeCheck(ctx, expr, &semaCtx, typ) + typedExpr, err := tree.TypeCheck(ctx, expr, semaCtx, typ) if err != nil { return nil, err } diff --git a/pkg/sql/rowenc/roundtrip_format_test.go b/pkg/sql/rowenc/roundtrip_format_test.go index 502d7a7e9caa..8a212a5ac754 100644 --- a/pkg/sql/rowenc/roundtrip_format_test.go +++ b/pkg/sql/rowenc/roundtrip_format_test.go @@ -98,7 +98,7 @@ func TestRandParseDatumStringAs(t *testing.T) { t.Fatal(ds, err) } - parsed, err := rowenc.ParseDatumStringAs(context.Background(), typ, ds, evalCtx) + parsed, err := rowenc.ParseDatumStringAs(context.Background(), typ, ds, evalCtx, nil /* semaCtx */) if err != nil { t.Fatal(ds, err) } @@ -294,7 +294,7 @@ func TestParseDatumStringAs(t *testing.T) { t.Run(typ.String(), func(t *testing.T) { for _, s := range exprs { t.Run(fmt.Sprintf("%q", s), func(t *testing.T) { - d, err := rowenc.ParseDatumStringAs(context.Background(), typ, s, evalCtx) + d, err := rowenc.ParseDatumStringAs(context.Background(), typ, s, evalCtx, nil /* semaCtx */) if err != nil { t.Fatal(err) } @@ -302,7 +302,7 @@ func TestParseDatumStringAs(t *testing.T) { t.Fatalf("unexpected type: %s", d.ResolvedType()) } ds := tree.AsStringWithFlags(d, tree.FmtExport) - parsed, err := rowenc.ParseDatumStringAs(context.Background(), typ, ds, evalCtx) + parsed, err := rowenc.ParseDatumStringAs(context.Background(), typ, ds, evalCtx, nil /* semaCtx */) if err != nil { t.Fatal(err) } diff --git a/pkg/sql/stats/json.go b/pkg/sql/stats/json.go index 0959234add36..89ecdd1b3ea6 100644 --- a/pkg/sql/stats/json.go +++ b/pkg/sql/stats/json.go @@ -150,7 +150,7 @@ func (js *JSONStatistic) GetHistogram( h.Buckets = make([]HistogramData_Bucket, len(js.HistogramBuckets)) for i := range h.Buckets { hb := &js.HistogramBuckets[i] - upperVal, err := rowenc.ParseDatumStringAs(ctx, colType, hb.UpperBound, evalCtx) + upperVal, err := rowenc.ParseDatumStringAs(ctx, colType, hb.UpperBound, evalCtx, semaCtx) if err != nil { return nil, err } From faf2cde57df5b4783222362b7dda5a43e21c42cc Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Tue, 12 Dec 2023 18:34:02 -0800 Subject: [PATCH 3/5] roachtest: speed up import cancellation Previously, when determining the number of TPCH queries we're running during the IMPORT, we used the length of the string, which made it so that we ran queries more than we intended. This is now fixed. Epic: None Release note: None --- pkg/cmd/roachtest/tests/import_cancellation.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/cmd/roachtest/tests/import_cancellation.go b/pkg/cmd/roachtest/tests/import_cancellation.go index 2a9e8dafe247..ca9c997b0df5 100644 --- a/pkg/cmd/roachtest/tests/import_cancellation.go +++ b/pkg/cmd/roachtest/tests/import_cancellation.go @@ -146,6 +146,7 @@ func runImportCancellation(ctx context.Context, t test.Test, c cluster.Cluster) // for Scale Factor 1, so since we're using Scale Factor 100 some TPCH // queries are expected to return different results - skip those. var queries string + var numQueries int for i := 1; i <= tpch.NumQueries; i++ { switch i { case 11, 13, 16, 18, 20: @@ -155,11 +156,12 @@ func runImportCancellation(ctx context.Context, t test.Test, c cluster.Cluster) queries += "," } queries += strconv.Itoa(i) + numQueries++ } } // maxOps flag will allow us to exit the workload once all the queries // were run 2 times. - maxOps := 2 * len(queries) + maxOps := 2 * numQueries cmd := fmt.Sprintf( "./workload run tpch --db=csv --concurrency=1 --queries=%s --max-ops=%d {pgurl%s} "+ "--enable-checks=true", queries, maxOps, c.All()) From 1863417c36ff38146fe08d08d245431732286ae4 Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Tue, 12 Dec 2023 20:13:21 -0700 Subject: [PATCH 4/5] roachtest: remove or roachtests without post restore workload We no longer need them. Epic: none Release note: none --- pkg/cmd/roachtest/tests/online_restore.go | 186 ++++++++++------------ 1 file changed, 88 insertions(+), 98 deletions(-) diff --git a/pkg/cmd/roachtest/tests/online_restore.go b/pkg/cmd/roachtest/tests/online_restore.go index d3bf9e7ef054..7cc2f9cab1ec 100644 --- a/pkg/cmd/roachtest/tests/online_restore.go +++ b/pkg/cmd/roachtest/tests/online_restore.go @@ -60,111 +60,101 @@ func registerOnlineRestore(r registry.Registry) { }, } { for _, runOnline := range []bool{true, false} { - for _, runWorkload := range []bool{true, false} { - sp := sp - runOnline := runOnline - runWorkload := runWorkload + sp := sp + runOnline := runOnline - if runOnline { - sp.namePrefix = "online/" - } else { - sp.namePrefix = "offline/" - sp.skip = "used for ad hoc experiments" - } - sp.namePrefix = sp.namePrefix + fmt.Sprintf("workload=%t", runWorkload) + if runOnline { + sp.namePrefix = "online" + } else { + sp.namePrefix = "offline" + sp.skip = "used for ad hoc experiments" + } - sp.initTestName() - r.Add(registry.TestSpec{ - Name: sp.testName, - Owner: registry.OwnerDisasterRecovery, - Benchmark: true, - Cluster: sp.hardware.makeClusterSpecs(r, sp.backup.cloud), - Timeout: sp.timeout, - // These tests measure performance. To ensure consistent perf, - // disable metamorphic encryption. - EncryptionSupport: registry.EncryptionAlwaysDisabled, - CompatibleClouds: registry.Clouds(sp.backup.cloud), - Suites: sp.suites, - Skip: sp.skip, - Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { + sp.initTestName() + r.Add(registry.TestSpec{ + Name: sp.testName, + Owner: registry.OwnerDisasterRecovery, + Benchmark: true, + Cluster: sp.hardware.makeClusterSpecs(r, sp.backup.cloud), + Timeout: sp.timeout, + // These tests measure performance. To ensure consistent perf, + // disable metamorphic encryption. + EncryptionSupport: registry.EncryptionAlwaysDisabled, + CompatibleClouds: registry.Clouds(sp.backup.cloud), + Suites: sp.suites, + Skip: sp.skip, + Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { - testStartTime := timeutil.Now() + testStartTime := timeutil.Now() - rd := makeRestoreDriver(t, c, sp) - rd.prepareCluster(ctx) + rd := makeRestoreDriver(t, c, sp) + rd.prepareCluster(ctx) - m := c.NewMonitor(ctx, sp.hardware.getCRDBNodes()) - m.Go(func(ctx context.Context) error { - db, err := rd.c.ConnE(ctx, rd.t.L(), rd.c.Node(1)[0]) - if err != nil { - return err - } - defer db.Close() - if _, err := db.Exec("SET CLUSTER SETTING kv.queue.process.guaranteed_time_budget='1h'"); err != nil { - return err - } - if _, err := db.Exec("SET CLUSTER SETTING kv.snapshot_receiver.excise.enabled=true"); err != nil { - return err - } - if _, err := db.Exec("SET CLUSTER SETTING sql.stats.automatic_collection.enabled=false"); err != nil { - return err - } - opts := "" - if runOnline { - opts = "WITH EXPERIMENTAL DEFERRED COPY" - } - restoreCmd := rd.restoreCmd("DATABASE tpce", opts) - t.L().Printf("Running %s", restoreCmd) - if _, err = db.ExecContext(ctx, restoreCmd); err != nil { - return err - } - return nil - }) - m.Wait() + m := c.NewMonitor(ctx, sp.hardware.getCRDBNodes()) + m.Go(func(ctx context.Context) error { + db, err := rd.c.ConnE(ctx, rd.t.L(), rd.c.Node(1)[0]) + if err != nil { + return err + } + defer db.Close() + if _, err := db.Exec("SET CLUSTER SETTING kv.queue.process.guaranteed_time_budget='1h'"); err != nil { + return err + } + if _, err := db.Exec("SET CLUSTER SETTING kv.snapshot_receiver.excise.enabled=true"); err != nil { + return err + } + if _, err := db.Exec("SET CLUSTER SETTING sql.stats.automatic_collection.enabled=false"); err != nil { + return err + } + opts := "" + if runOnline { + opts = "WITH EXPERIMENTAL DEFERRED COPY" + } + restoreCmd := rd.restoreCmd("DATABASE tpce", opts) + t.L().Printf("Running %s", restoreCmd) + if _, err = db.ExecContext(ctx, restoreCmd); err != nil { + return err + } + return nil + }) + m.Wait() - workloadCtx, workloadCancel := context.WithCancel(ctx) - mDownload := c.NewMonitor(workloadCtx, sp.hardware.getCRDBNodes()) - // TODO(msbutler): add foreground query latency tracker + workloadCtx, workloadCancel := context.WithCancel(ctx) + mDownload := c.NewMonitor(workloadCtx, sp.hardware.getCRDBNodes()) + // TODO(msbutler): add foreground query latency tracker - mDownload.Go(func(ctx context.Context) error { - if !runWorkload { - fmt.Printf("roachtest configured to skip running the foreground workload") - return nil - } - err := sp.backup.workload.run(ctx, t, c, sp.hardware) - // We expect the workload to return a context cancelled error because - // the roachtest driver cancels the monitor's context after the download job completes - if err != nil && ctx.Err() == nil { - // Implies the workload context was not cancelled and the workload cmd returned a - // different error. - return errors.Wrapf(err, `Workload context was not cancelled. Error returned by workload cmd`) - } - rd.t.L().Printf("workload successfully finished") - return nil - }) - mDownload.Go(func(ctx context.Context) error { - defer workloadCancel() - if runOnline { - return waitForDownloadJob(ctx, c, t.L()) - } - if runWorkload { - // If we just completed an offline restore and are running the - // workload, run the workload until we're at most 15 minutes - // away from timing out. - testRuntime := timeutil.Since(testStartTime) - workloadDuration := sp.timeout - testRuntime - if workloadDuration > time.Minute*15 { - workloadDuration = workloadDuration - time.Minute*15 - } - fmt.Printf("let workload run for %.2f minutes", workloadDuration.Minutes()) - time.Sleep(workloadDuration) - } - return nil - }) - mDownload.Wait() - }, - }) - } + mDownload.Go(func(ctx context.Context) error { + err := sp.backup.workload.run(ctx, t, c, sp.hardware) + // We expect the workload to return a context cancelled error because + // the roachtest driver cancels the monitor's context after the download job completes + if err != nil && ctx.Err() == nil { + // Implies the workload context was not cancelled and the workload cmd returned a + // different error. + return errors.Wrapf(err, `Workload context was not cancelled. Error returned by workload cmd`) + } + rd.t.L().Printf("workload successfully finished") + return nil + }) + mDownload.Go(func(ctx context.Context) error { + defer workloadCancel() + if runOnline { + return waitForDownloadJob(ctx, c, t.L()) + } + // If we just completed an offline restore and are running the + // workload, run the workload until we're at most 15 minutes + // away from timing out. + testRuntime := timeutil.Since(testStartTime) + workloadDuration := sp.timeout - testRuntime + if workloadDuration > time.Minute*15 { + workloadDuration = workloadDuration - time.Minute*15 + } + fmt.Printf("let workload run for %.2f minutes", workloadDuration.Minutes()) + time.Sleep(workloadDuration) + return nil + }) + mDownload.Wait() + }, + }) } } } From 7fe7a2dd7f873abec59bdc8244ade8cab258cfdf Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Tue, 12 Dec 2023 20:16:23 -0700 Subject: [PATCH 5/5] roachtest: unskip restore/online/tpce/400GB/aws/inc-count=1/nodes=4/cpus=8 Epic: none Release note: none --- pkg/cmd/roachtest/tests/online_restore.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/cmd/roachtest/tests/online_restore.go b/pkg/cmd/roachtest/tests/online_restore.go index 7cc2f9cab1ec..0126a2ad6a7d 100644 --- a/pkg/cmd/roachtest/tests/online_restore.go +++ b/pkg/cmd/roachtest/tests/online_restore.go @@ -43,7 +43,6 @@ func registerOnlineRestore(r registry.Registry) { timeout: 5 * time.Hour, suites: registry.Suites(registry.Nightly), restoreUptoIncremental: 1, - skip: "used for ad hoc experiments", }, { // 8TB tpce Online Restore