diff --git a/docs/generated/metrics/metrics.html b/docs/generated/metrics/metrics.html index b32f3c4bb659..ef32d85a0689 100644 --- a/docs/generated/metrics/metrics.html +++ b/docs/generated/metrics/metrics.html @@ -1095,6 +1095,18 @@ APPLICATIONjobs.import.resume_completedNumber of import jobs which successfully resumed to completionjobsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE APPLICATIONjobs.import.resume_failedNumber of import jobs which failed with a non-retriable errorjobsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE APPLICATIONjobs.import.resume_retry_errorNumber of import jobs which failed with a retriable errorjobsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE +APPLICATIONjobs.import_rollback.currently_idleNumber of import_rollback jobs currently considered Idle and can be freely shut downjobsGAUGECOUNTAVGNONE +APPLICATIONjobs.import_rollback.currently_pausedNumber of import_rollback jobs currently considered PausedjobsGAUGECOUNTAVGNONE +APPLICATIONjobs.import_rollback.currently_runningNumber of import_rollback jobs currently running in Resume or OnFailOrCancel statejobsGAUGECOUNTAVGNONE +APPLICATIONjobs.import_rollback.expired_pts_recordsNumber of expired protected timestamp records owned by import_rollback jobsrecordsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE +APPLICATIONjobs.import_rollback.fail_or_cancel_completedNumber of import_rollback jobs which successfully completed their failure or cancelation processjobsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE +APPLICATIONjobs.import_rollback.fail_or_cancel_failedNumber of import_rollback jobs which failed with a non-retriable error on their failure or cancelation processjobsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE +APPLICATIONjobs.import_rollback.fail_or_cancel_retry_errorNumber of import_rollback jobs which failed with a retriable error on their failure or cancelation processjobsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE +APPLICATIONjobs.import_rollback.protected_age_secThe age of the oldest PTS record protected by import_rollback jobssecondsGAUGESECONDSAVGNONE +APPLICATIONjobs.import_rollback.protected_record_countNumber of protected timestamp records held by import_rollback jobsrecordsGAUGECOUNTAVGNONE +APPLICATIONjobs.import_rollback.resume_completedNumber of import_rollback jobs which successfully resumed to completionjobsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE +APPLICATIONjobs.import_rollback.resume_failedNumber of import_rollback jobs which failed with a non-retriable errorjobsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE +APPLICATIONjobs.import_rollback.resume_retry_errorNumber of import_rollback jobs which failed with a retriable errorjobsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE APPLICATIONjobs.key_visualizer.currently_idleNumber of key_visualizer jobs currently considered Idle and can be freely shut downjobsGAUGECOUNTAVGNONE APPLICATIONjobs.key_visualizer.currently_pausedNumber of key_visualizer jobs currently considered PausedjobsGAUGECOUNTAVGNONE APPLICATIONjobs.key_visualizer.currently_runningNumber of key_visualizer jobs currently running in Resume or OnFailOrCancel statejobsGAUGECOUNTAVGNONE diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 4c906b9efcae..f5267ff393d1 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -812,18 +812,25 @@ func shouldPreRestore(table *tabledesc.Mutable) bool { // backedUpDescriptorWithInProgressImportInto returns true if the backed up descriptor represents a table with an in // progress import that started in a cluster finalized to version 22.2. -func backedUpDescriptorWithInProgressImportInto( - ctx context.Context, p sql.JobExecContext, desc catalog.Descriptor, -) (bool, error) { +func backedUpDescriptorWithInProgressImportInto(desc catalog.Descriptor) bool { table, ok := desc.(catalog.TableDescriptor) if !ok { - return false, nil + return false } - if table.GetInProgressImportStartTime() == 0 { - return false, nil + return table.GetInProgressImportStartTime() > 0 +} + +// epochBasedInProgressImport returns true if the backup up descriptor +// represents a table with an inprogress import that used +// ImportEpochs. +func epochBasedInProgressImport(desc catalog.Descriptor) bool { + table, ok := desc.(catalog.TableDescriptor) + if !ok { + return false } - return true, nil + + return table.GetInProgressImportStartTime() > 0 && table.TableDesc().ImportEpoch > 0 } // createImportingDescriptors creates the tables that we will restore into and returns up to three @@ -886,22 +893,32 @@ func createImportingDescriptors( for _, desc := range sqlDescs { // Decide which offline tables to include in the restore: // - // - An offline table created by RESTORE or IMPORT PGDUMP is fully discarded. - // The table will not exist in the restoring cluster. + // - An offline table created by RESTORE or IMPORT PGDUMP is + // fully discarded. The table will not exist in the restoring + // cluster. + // + // - An offline table undergoing an IMPORT INTO in traditional + // restore has all importing data elided in the restore + // processor and is restored online to its pre import state. + // + // - An offline table undergoing an IMPORT INTO in online + // restore with no ImportEpoch cannot be restored and an error + // is returned. // - // - An offline table undergoing an IMPORT INTO has all importing data - // elided in the restore processor and is restored online to its pre import - // state. + // - An offline table undergoing an IMPORT INTO in online + // restore with an ImportEpoch is restored with an Offline + // table and a revert job is queued that will bring the table + // back online. if desc.Offline() { if schema, ok := desc.(catalog.SchemaDescriptor); ok { offlineSchemas[schema.GetID()] = struct{}{} } - if hasInProgressImportInto, err := backedUpDescriptorWithInProgressImportInto(ctx, p, desc); err != nil { - return nil, nil, nil, err - } else if hasInProgressImportInto && details.ExperimentalOnline { - return nil, nil, nil, errors.Newf("table %s (id %d) in restoring backup has an in-progress import, but online restore cannot be run on a table with an in progress import", desc.GetName(), desc.GetID()) - } else if !hasInProgressImportInto { + if backedUpDescriptorWithInProgressImportInto(desc) { + if details.ExperimentalOnline && !epochBasedInProgressImport(desc) { + return nil, nil, nil, errors.Newf("table %s (id %d) in restoring backup has an in-progress import, but online restore cannot be run on a table with an in progress import", desc.GetName(), desc.GetID()) + } + } else { continue } } @@ -2256,7 +2273,8 @@ func (r *restoreResumer) publishDescriptors( // Write the new TableDescriptors and flip state over to public so they can be // accessed. for i := range details.TableDescs { - mutTable := all.LookupDescriptor(details.TableDescs[i].GetID()).(*tabledesc.Mutable) + desc := all.LookupDescriptor(details.TableDescs[i].GetID()) + mutTable := desc.(*tabledesc.Mutable) if details.ExperimentalOnline && mutTable.IsTable() { // We disable automatic stats refresh on all restored tables until the @@ -2296,10 +2314,6 @@ func (r *restoreResumer) publishDescriptors( mutTable.RowLevelTTL.ScheduleID = j.ScheduleID() } - // If this was an importing table, it is now effectively _not_ - // importing. - mutTable.FinalizeImport() - newTables = append(newTables, mutTable.TableDesc()) // Convert any mutations that were in progress on the table descriptor @@ -2310,6 +2324,17 @@ func (r *restoreResumer) publishDescriptors( return err } + if details.ExperimentalOnline && epochBasedInProgressImport(desc) { + if err := createImportRollbackJob(ctx, + r.execCfg.JobRegistry, txn, r.job.Payload().UsernameProto.Decode(), mutTable, + ); err != nil { + return err + } + } else { + // If this was an importing table, it is now effectively _not_ + // importing. + mutTable.FinalizeImport() + } } // For all of the newly created types, make type schema change jobs for any // type descriptors that were backed up in the middle of a type schema change. @@ -2340,7 +2365,11 @@ func (r *restoreResumer) publishDescriptors( b := txn.KV().NewBatch() if err := all.ForEachDescriptor(func(desc catalog.Descriptor) error { d := desc.(catalog.MutableDescriptor) - d.SetPublic() + if details.ExperimentalOnline && epochBasedInProgressImport(desc) { + log.Infof(ctx, "table %q (%d) with in-progress IMPORT remaining offline", desc.GetName(), desc.GetID()) + } else { + d.SetPublic() + } return txn.Descriptors().WriteDescToBatch(ctx, kvTrace, d, b) }); err != nil { return err diff --git a/pkg/ccl/backupccl/restore_online.go b/pkg/ccl/backupccl/restore_online.go index ecd7b2e1080d..4ce4dd352fd3 100644 --- a/pkg/ccl/backupccl/restore_online.go +++ b/pkg/ccl/backupccl/restore_online.go @@ -19,10 +19,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" @@ -650,3 +653,24 @@ func (r *restoreResumer) cleanupAfterDownload( } return nil } + +func createImportRollbackJob( + ctx context.Context, + jr *jobs.Registry, + txn isql.Txn, + username username.SQLUsername, + tableDesc *tabledesc.Mutable, +) error { + jobRecord := jobs.Record{ + Description: fmt.Sprintf("ROLLBACK IMPORT INTO %s", tableDesc.GetName()), + Username: username, + NonCancelable: true, + DescriptorIDs: descpb.IDs{tableDesc.GetID()}, + Details: jobspb.ImportRollbackDetails{ + TableID: tableDesc.GetID(), + }, + Progress: jobspb.ImportRollbackProgress{}, + } + _, err := jr.CreateJobWithTxn(ctx, jobRecord, jr.MakeJobID(), txn) + return err +} diff --git a/pkg/ccl/backupccl/testdata/backup-restore/online-restore-in-progress-imports b/pkg/ccl/backupccl/testdata/backup-restore/online-restore-in-progress-imports index cd99aedf5821..083c8244752b 100644 --- a/pkg/ccl/backupccl/testdata/backup-restore/online-restore-in-progress-imports +++ b/pkg/ccl/backupccl/testdata/backup-restore/online-restore-in-progress-imports @@ -1,5 +1,5 @@ -# This test ensures that online restore failes when restoring tables -# undergoing an in progress import +# This test ensures that online restore generates an IMPORT ROLLBACK +# job. reset test-nodelocal ---- @@ -7,29 +7,47 @@ reset test-nodelocal new-cluster name=s1 disable-tenant ---- +exec-sql +CREATE DATABASE d; +USE d; +CREATE TABLE foo (i INT PRIMARY KEY, s STRING); +CREATE TABLE baz (i INT PRIMARY KEY, s STRING); +INSERT INTO baz VALUES (1, 'x'),(2,'y'),(3,'z'); +INSERT INTO foo VALUES (4, 'a'); +---- exec-sql -SET CLUSTER SETTING jobs.debug.pausepoints = 'import.after_ingest'; +EXPORT INTO CSV 'nodelocal://1/export1/' FROM SELECT * FROM baz; ---- +NOTICE: EXPORT is not the recommended way to move data out of CockroachDB and may be deprecated in the future. Please consider exporting data with changefeeds instead: https://www.cockroachlabs.com/docs/stable/export-data-with-changefeeds +exec-sql +SET CLUSTER SETTING jobs.debug.pausepoints = 'import.after_ingest'; +---- -# Pause the import job, in order to back up the importing data. import expect-pausepoint tag=a -IMPORT INTO data.bank CSV DATA ('workload:///csv/bank/bank?rows=100&version=1.0.0') +IMPORT INTO foo (i,s) CSV DATA ('nodelocal://1/export1/export*-n*.0.csv') ---- job paused at pausepoint - exec-sql BACKUP INTO 'nodelocal://1/cluster/'; ---- +exec-sql +RESTORE DATABASE d FROM LATEST IN 'nodelocal://1/cluster/' WITH EXPERIMENTAL DEFERRED COPY, new_db_name=d2; +---- -new-cluster name=s2 share-io-dir=s1 allow-implicit-access disable-tenant +exec-sql +SHOW JOB WHEN COMPLETE (SELECT id FROM system.jobs WHERE job_type = 'IMPORT ROLLBACK') ---- +query-sql +SELECT description, status FROM [SHOW JOBS (SELECT id FROM system.jobs WHERE job_type = 'IMPORT ROLLBACK')] +---- +ROLLBACK IMPORT INTO foo succeeded -exec-sql -RESTORE DATABASE data FROM LATEST IN 'nodelocal://1/cluster/' with EXPERIMENTAL DEFERRED COPY; +query-sql +SELECT count(1) FROM d2.foo ---- -pq: table bank (id 106) in restoring backup has an in-progress import, but online restore cannot be run on a table with an in progress import \ No newline at end of file +1 diff --git a/pkg/jobs/jobspb/jobs.proto b/pkg/jobs/jobspb/jobs.proto index 6663467c93f4..dee8d43147a5 100644 --- a/pkg/jobs/jobspb/jobs.proto +++ b/pkg/jobs/jobspb/jobs.proto @@ -1263,6 +1263,19 @@ message MVCCStatisticsJobProgress { } +message ImportRollbackDetails { + // TableID is the descriptor ID of table that should be rolled back. + // + // TODO(ssd): We could consider having this job process multiple + // tables. + uint32 table_id = 1 [ + (gogoproto.customname) = "TableID", + (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb.ID" + ]; +} + +message ImportRollbackProgress {} + message Payload { string description = 1; // If empty, the description is assumed to be the statement. @@ -1326,6 +1339,7 @@ message Payload { AutoConfigTaskDetails auto_config_task = 43; AutoUpdateSQLActivityDetails auto_update_sql_activities = 44; MVCCStatisticsJobDetails mvcc_statistics_details = 45; + ImportRollbackDetails import_rollback_details = 46; } reserved 26; // PauseReason is used to describe the reason that the job is currently paused @@ -1400,6 +1414,7 @@ message Progress { AutoConfigTaskProgress auto_config_task = 31; AutoUpdateSQLActivityProgress update_sql_activity = 32; MVCCStatisticsJobProgress mvcc_statistics_progress = 33; + ImportRollbackProgress import_rollback_progress = 34; } uint64 trace_id = 21 [(gogoproto.nullable) = false, (gogoproto.customname) = "TraceID", (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb.TraceID"]; @@ -1436,6 +1451,7 @@ enum Type { AUTO_CONFIG_TASK = 22 [(gogoproto.enumvalue_customname) = "TypeAutoConfigTask"]; AUTO_UPDATE_SQL_ACTIVITY = 23 [(gogoproto.enumvalue_customname) = "TypeAutoUpdateSQLActivity"]; MVCC_STATISTICS_UPDATE = 24 [(gogoproto.enumvalue_customname) = "TypeMVCCStatisticsUpdate"]; + IMPORT_ROLLBACK = 25 [(gogoproto.enumvalue_customname) = "TypeImportRollback"]; } message Job { diff --git a/pkg/jobs/jobspb/wrap.go b/pkg/jobs/jobspb/wrap.go index 591e30a4f6bf..d559175d4259 100644 --- a/pkg/jobs/jobspb/wrap.go +++ b/pkg/jobs/jobspb/wrap.go @@ -46,6 +46,7 @@ var ( _ Details = AutoConfigTaskDetails{} _ Details = AutoUpdateSQLActivityDetails{} _ Details = MVCCStatisticsJobDetails{} + _ Details = ImportRollbackDetails{} ) // ProgressDetails is a marker interface for job progress details proto structs. @@ -71,6 +72,7 @@ var ( _ ProgressDetails = AutoConfigTaskProgress{} _ ProgressDetails = AutoUpdateSQLActivityProgress{} _ ProgressDetails = MVCCStatisticsJobProgress{} + _ ProgressDetails = ImportRollbackProgress{} ) // Type returns the payload's job type and panics if the type is invalid. @@ -213,6 +215,8 @@ func DetailsType(d isPayload_Details) (Type, error) { return TypeAutoUpdateSQLActivity, nil case *Payload_MvccStatisticsDetails: return TypeMVCCStatisticsUpdate, nil + case *Payload_ImportRollbackDetails: + return TypeImportRollback, nil default: return TypeUnspecified, errors.Newf("Payload.Type called on a payload with an unknown details type: %T", d) } @@ -258,6 +262,7 @@ var JobDetailsForEveryJobType = map[Type]Details{ TypeAutoConfigTask: AutoConfigTaskDetails{}, TypeAutoUpdateSQLActivity: AutoUpdateSQLActivityDetails{}, TypeMVCCStatisticsUpdate: MVCCStatisticsJobDetails{}, + TypeImportRollback: ImportRollbackDetails{}, } // WrapProgressDetails wraps a ProgressDetails object in the protobuf wrapper @@ -315,6 +320,8 @@ func WrapProgressDetails(details ProgressDetails) interface { return &Progress_UpdateSqlActivity{UpdateSqlActivity: &d} case MVCCStatisticsJobProgress: return &Progress_MvccStatisticsProgress{MvccStatisticsProgress: &d} + case ImportRollbackProgress: + return &Progress_ImportRollbackProgress{ImportRollbackProgress: &d} default: panic(errors.AssertionFailedf("WrapProgressDetails: unknown progress type %T", d)) } @@ -370,6 +377,8 @@ func (p *Payload) UnwrapDetails() Details { return *d.AutoUpdateSqlActivities case *Payload_MvccStatisticsDetails: return *d.MvccStatisticsDetails + case *Payload_ImportRollbackDetails: + return *d.ImportRollbackDetails default: return nil } @@ -425,6 +434,8 @@ func (p *Progress) UnwrapDetails() ProgressDetails { return *d.UpdateSqlActivity case *Progress_MvccStatisticsProgress: return *d.MvccStatisticsProgress + case *Progress_ImportRollbackProgress: + return *d.ImportRollbackProgress default: return nil } @@ -504,6 +515,8 @@ func WrapPayloadDetails(details Details) interface { return &Payload_AutoUpdateSqlActivities{AutoUpdateSqlActivities: &d} case MVCCStatisticsJobDetails: return &Payload_MvccStatisticsDetails{MvccStatisticsDetails: &d} + case ImportRollbackDetails: + return &Payload_ImportRollbackDetails{ImportRollbackDetails: &d} default: panic(errors.AssertionFailedf("jobs.WrapPayloadDetails: unknown details type %T", d)) } @@ -539,7 +552,7 @@ const ( func (Type) SafeValue() {} // NumJobTypes is the number of jobs types. -const NumJobTypes = 25 +const NumJobTypes = 26 // ChangefeedDetailsMarshaler allows for dependency injection of // cloud.SanitizeExternalStorageURI to avoid the dependency from this diff --git a/pkg/sql/importer/BUILD.bazel b/pkg/sql/importer/BUILD.bazel index 6a8008f0e59a..2a50ea1375bd 100644 --- a/pkg/sql/importer/BUILD.bazel +++ b/pkg/sql/importer/BUILD.bazel @@ -26,6 +26,7 @@ go_library( "read_import_pgcopy.go", "read_import_pgdump.go", "read_import_workload.go", + "rollback_job.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/sql/importer", visibility = ["//visibility:public"], diff --git a/pkg/sql/importer/import_job.go b/pkg/sql/importer/import_job.go index c82ce86674d4..413bd19ef2e7 100644 --- a/pkg/sql/importer/import_job.go +++ b/pkg/sql/importer/import_job.go @@ -1261,7 +1261,7 @@ var importEpochs = settings.RegisterBoolSetting( settings.ApplicationLevel, "bulkio.import.write_import_epoch.enabled", "controls whether IMPORT will write ImportEpoch's to descriptors", - false, + true, ) func getFractionCompleted(job *jobs.Job) float64 { @@ -1599,7 +1599,7 @@ func (r *importResumer) dropTables( execCfg.Codec, &execCfg.Settings.SV, execCfg.DistSender, - intoTable, + intoTable.GetID(), predicates, sql.RevertTableDefaultBatchSize); err != nil { return errors.Wrap(err, "rolling back IMPORT INTO in non empty table via DeleteRange") } diff --git a/pkg/sql/importer/rollback_job.go b/pkg/sql/importer/rollback_job.go new file mode 100644 index 000000000000..2ee516707e83 --- /dev/null +++ b/pkg/sql/importer/rollback_job.go @@ -0,0 +1,139 @@ +// Copyright 2017 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package importer + +import ( + "context" + "time" + + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" + "github.com/cockroachdb/cockroach/pkg/sql/isql" + "github.com/cockroachdb/cockroach/pkg/sql/sem/catid" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/errors" +) + +// importRollbackResumer is an IMPORT ROLLBACK job that uses +// ImportEpoch based delete range's to roll back imported data. This +// job is synthesized during online restore to provide rollback of +// IMPORT data without relying on MVCC timestamps. +// +// NB: An alternative here would be to create a Import job in a +// reverting state with some new booleans to handle this +// ImportEpoch-based rollback. +type importRollbackResumer struct { + job *jobs.Job + settings *cluster.Settings +} + +func (r *importRollbackResumer) Resume(ctx context.Context, execCtx interface{}) error { + cfg := execCtx.(sql.JobExecContext).ExecCfg() + importRollbackPayload := r.job.Payload().Details.(*jobspb.Payload_ImportRollbackDetails).ImportRollbackDetails + tableID := importRollbackPayload.TableID + + // We retry until paused or canceled. This job must eventually + // complete for the table to come back online without manual + // intervention. + retryOpts := retry.Options{ + InitialBackoff: 10 * time.Second, + MaxBackoff: 10 * time.Minute, + } + for re := retry.StartWithCtx(ctx, retryOpts); re.Next(); { + err := r.rollbackTable(ctx, cfg, tableID) + if err != nil { + log.Errorf(ctx, "rollback of table %d failed: %s", tableID, err.Error()) + } else { + return nil + } + } + return ctx.Err() +} + +func (r *importRollbackResumer) rollbackTable( + ctx context.Context, cfg *sql.ExecutorConfig, tableID catid.DescID, +) error { + return sql.DescsTxn(ctx, cfg, func(ctx context.Context, txn isql.Txn, descsCol *descs.Collection) error { + desc, err := descsCol.MutableByID(txn.KV()).Table(ctx, tableID) + if err != nil { + return errors.Wrapf(err, "looking up descriptor %d", tableID) + } + + // TODO(ssd): We could fail here instead if we start tracking + // progress. Right now, this could happen in normal operation + // if we finish bringing the table online but then can't move + // the job to succeeded for some reason. + if desc.Public() { + log.Infof(ctx, "table %d already PUBLIC cannot rollback", tableID) + return nil + } + + importEpoch := desc.TableDesc().ImportEpoch + if importEpoch <= 0 { + return errors.Errorf("cannot ROLLBACK table %d with ImportEpoch = 0", tableID) + } + + // TODO(ssd): This isn't transactional, but we + // currently do this inside the DescsTxn in the normal + // import rollback path so I've done that here for + // consistency. + if err := sql.DeleteTableWithPredicate( + ctx, + cfg.DB, + cfg.Codec, + &cfg.Settings.SV, + cfg.DistSender, + tableID, + kvpb.DeleteRangePredicates{ + ImportEpoch: importEpoch, + }, + sql.RevertTableDefaultBatchSize); err != nil { + return errors.Wrap(err, "rolling back IMPORT INTO in non empty table via DeleteRange") + } + + log.Infof(ctx, "transitioning table %q (%d) to PUBLIC", desc.GetName(), desc.GetID()) + desc.SetPublic() + desc.FinalizeImport() + b := txn.KV().NewBatch() + if err := descsCol.WriteDescToBatch( + ctx, false /* kvTrace */, desc, b, + ); err != nil { + return errors.Wrapf(err, "publishing table %d", desc.ID) + } + return txn.KV().Run(ctx, b) + }) +} + +func (*importRollbackResumer) CollectProfile(context.Context, interface{}) error { + return nil +} + +func (r *importRollbackResumer) OnFailOrCancel(context.Context, interface{}, error) error { + return errors.AssertionFailedf("OnFailOrCancel called on non-cancellable job %d", r.job.ID()) +} + +func init() { + jobs.RegisterConstructor( + jobspb.TypeImportRollback, + func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer { + return &importRollbackResumer{ + job: job, + settings: settings, + } + }, + jobs.UsesTenantCostControl, + ) +} diff --git a/pkg/sql/revert.go b/pkg/sql/revert.go index a13fd9d33fe4..81ca7152ab5f 100644 --- a/pkg/sql/revert.go +++ b/pkg/sql/revert.go @@ -19,7 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" - "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/sem/catid" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -62,13 +62,13 @@ func DeleteTableWithPredicate( codec keys.SQLCodec, sv *settings.Values, distSender *kvcoord.DistSender, - table catalog.TableDescriptor, + tableID catid.DescID, predicates kvpb.DeleteRangePredicates, batchSize int64, ) error { - log.Infof(ctx, "deleting data for table %d with predicate %s", table.GetID(), predicates.String()) - tableKey := roachpb.RKey(codec.TablePrefix(uint32(table.GetID()))) + log.Infof(ctx, "deleting data for table %d with predicate %s", tableID, predicates.String()) + tableKey := roachpb.RKey(codec.TablePrefix(uint32(tableID))) tableSpan := roachpb.RSpan{Key: tableKey, EndKey: tableKey.PrefixEnd()} // To process the table in parallel, spin up a few workers and partition the diff --git a/pkg/sql/revert_test.go b/pkg/sql/revert_test.go index aa7a8e34dd58..d80ed3ec1861 100644 --- a/pkg/sql/revert_test.go +++ b/pkg/sql/revert_test.go @@ -66,7 +66,7 @@ func TestTableRollback(t *testing.T) { predicates := kvpb.DeleteRangePredicates{StartTime: targetTime} require.NoError(t, sql.DeleteTableWithPredicate( - ctx, kv, codec, sv, execCfg.DistSender, desc, predicates, 10)) + ctx, kv, codec, sv, execCfg.DistSender, desc.GetID(), predicates, 10)) db.CheckQueryResults(t, `SELECT count(*) FROM test`, beforeNumRows)