Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql,backupccl: create IMPORT ROLLBACK job in online restore #120407

Merged
merged 1 commit into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -1095,6 +1095,18 @@
<tr><td>APPLICATION</td><td>jobs.import.resume_completed</td><td>Number of import jobs which successfully resumed to completion</td><td>jobs</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>jobs.import.resume_failed</td><td>Number of import jobs which failed with a non-retriable error</td><td>jobs</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>jobs.import.resume_retry_error</td><td>Number of import jobs which failed with a retriable error</td><td>jobs</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>jobs.import_rollback.currently_idle</td><td>Number of import_rollback jobs currently considered Idle and can be freely shut down</td><td>jobs</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>jobs.import_rollback.currently_paused</td><td>Number of import_rollback jobs currently considered Paused</td><td>jobs</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>jobs.import_rollback.currently_running</td><td>Number of import_rollback jobs currently running in Resume or OnFailOrCancel state</td><td>jobs</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>jobs.import_rollback.expired_pts_records</td><td>Number of expired protected timestamp records owned by import_rollback jobs</td><td>records</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>jobs.import_rollback.fail_or_cancel_completed</td><td>Number of import_rollback jobs which successfully completed their failure or cancelation process</td><td>jobs</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>jobs.import_rollback.fail_or_cancel_failed</td><td>Number of import_rollback jobs which failed with a non-retriable error on their failure or cancelation process</td><td>jobs</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>jobs.import_rollback.fail_or_cancel_retry_error</td><td>Number of import_rollback jobs which failed with a retriable error on their failure or cancelation process</td><td>jobs</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>jobs.import_rollback.protected_age_sec</td><td>The age of the oldest PTS record protected by import_rollback jobs</td><td>seconds</td><td>GAUGE</td><td>SECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>jobs.import_rollback.protected_record_count</td><td>Number of protected timestamp records held by import_rollback jobs</td><td>records</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>jobs.import_rollback.resume_completed</td><td>Number of import_rollback jobs which successfully resumed to completion</td><td>jobs</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>jobs.import_rollback.resume_failed</td><td>Number of import_rollback jobs which failed with a non-retriable error</td><td>jobs</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>jobs.import_rollback.resume_retry_error</td><td>Number of import_rollback jobs which failed with a retriable error</td><td>jobs</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>jobs.key_visualizer.currently_idle</td><td>Number of key_visualizer jobs currently considered Idle and can be freely shut down</td><td>jobs</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>jobs.key_visualizer.currently_paused</td><td>Number of key_visualizer jobs currently considered Paused</td><td>jobs</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>jobs.key_visualizer.currently_running</td><td>Number of key_visualizer jobs currently running in Resume or OnFailOrCancel state</td><td>jobs</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
Expand Down
75 changes: 52 additions & 23 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -812,18 +812,25 @@ func shouldPreRestore(table *tabledesc.Mutable) bool {

// backedUpDescriptorWithInProgressImportInto returns true if the backed up descriptor represents a table with an in
// progress import that started in a cluster finalized to version 22.2.
func backedUpDescriptorWithInProgressImportInto(
ctx context.Context, p sql.JobExecContext, desc catalog.Descriptor,
) (bool, error) {
func backedUpDescriptorWithInProgressImportInto(desc catalog.Descriptor) bool {
table, ok := desc.(catalog.TableDescriptor)
if !ok {
return false, nil
return false
}

if table.GetInProgressImportStartTime() == 0 {
return false, nil
return table.GetInProgressImportStartTime() > 0
}

// epochBasedInProgressImport returns true if the backup up descriptor
// represents a table with an inprogress import that used
// ImportEpochs.
func epochBasedInProgressImport(desc catalog.Descriptor) bool {
table, ok := desc.(catalog.TableDescriptor)
if !ok {
return false
}
return true, nil

return table.GetInProgressImportStartTime() > 0 && table.TableDesc().ImportEpoch > 0
}

// createImportingDescriptors creates the tables that we will restore into and returns up to three
Expand Down Expand Up @@ -886,22 +893,32 @@ func createImportingDescriptors(
for _, desc := range sqlDescs {
// Decide which offline tables to include in the restore:
//
// - An offline table created by RESTORE or IMPORT PGDUMP is fully discarded.
// The table will not exist in the restoring cluster.
// - An offline table created by RESTORE or IMPORT PGDUMP is
// fully discarded. The table will not exist in the restoring
// cluster.
//
// - An offline table undergoing an IMPORT INTO in traditional
// restore has all importing data elided in the restore
// processor and is restored online to its pre import state.
//
// - An offline table undergoing an IMPORT INTO in online
// restore with no ImportEpoch cannot be restored and an error
// is returned.
//
// - An offline table undergoing an IMPORT INTO has all importing data
// elided in the restore processor and is restored online to its pre import
// state.
// - An offline table undergoing an IMPORT INTO in online
// restore with an ImportEpoch is restored with an Offline
// table and a revert job is queued that will bring the table
// back online.
if desc.Offline() {
if schema, ok := desc.(catalog.SchemaDescriptor); ok {
offlineSchemas[schema.GetID()] = struct{}{}
}

if hasInProgressImportInto, err := backedUpDescriptorWithInProgressImportInto(ctx, p, desc); err != nil {
return nil, nil, nil, err
} else if hasInProgressImportInto && details.ExperimentalOnline {
return nil, nil, nil, errors.Newf("table %s (id %d) in restoring backup has an in-progress import, but online restore cannot be run on a table with an in progress import", desc.GetName(), desc.GetID())
} else if !hasInProgressImportInto {
if backedUpDescriptorWithInProgressImportInto(desc) {
if details.ExperimentalOnline && !epochBasedInProgressImport(desc) {
return nil, nil, nil, errors.Newf("table %s (id %d) in restoring backup has an in-progress import, but online restore cannot be run on a table with an in progress import", desc.GetName(), desc.GetID())
}
} else {
continue
}
}
Expand Down Expand Up @@ -2256,7 +2273,8 @@ func (r *restoreResumer) publishDescriptors(
// Write the new TableDescriptors and flip state over to public so they can be
// accessed.
for i := range details.TableDescs {
mutTable := all.LookupDescriptor(details.TableDescs[i].GetID()).(*tabledesc.Mutable)
desc := all.LookupDescriptor(details.TableDescs[i].GetID())
mutTable := desc.(*tabledesc.Mutable)

if details.ExperimentalOnline && mutTable.IsTable() {
// We disable automatic stats refresh on all restored tables until the
Expand Down Expand Up @@ -2296,10 +2314,6 @@ func (r *restoreResumer) publishDescriptors(
mutTable.RowLevelTTL.ScheduleID = j.ScheduleID()
}

// If this was an importing table, it is now effectively _not_
// importing.
mutTable.FinalizeImport()

newTables = append(newTables, mutTable.TableDesc())

// Convert any mutations that were in progress on the table descriptor
Expand All @@ -2310,6 +2324,17 @@ func (r *restoreResumer) publishDescriptors(
return err
}

if details.ExperimentalOnline && epochBasedInProgressImport(desc) {
if err := createImportRollbackJob(ctx,
r.execCfg.JobRegistry, txn, r.job.Payload().UsernameProto.Decode(), mutTable,
); err != nil {
return err
}
} else {
// If this was an importing table, it is now effectively _not_
// importing.
mutTable.FinalizeImport()
}
}
// For all of the newly created types, make type schema change jobs for any
// type descriptors that were backed up in the middle of a type schema change.
Expand Down Expand Up @@ -2340,7 +2365,11 @@ func (r *restoreResumer) publishDescriptors(
b := txn.KV().NewBatch()
if err := all.ForEachDescriptor(func(desc catalog.Descriptor) error {
d := desc.(catalog.MutableDescriptor)
d.SetPublic()
if details.ExperimentalOnline && epochBasedInProgressImport(desc) {
log.Infof(ctx, "table %q (%d) with in-progress IMPORT remaining offline", desc.GetName(), desc.GetID())
} else {
d.SetPublic()
}
return txn.Descriptors().WriteDescToBatch(ctx, kvTrace, d, b)
}); err != nil {
return err
Expand Down
24 changes: 24 additions & 0 deletions pkg/ccl/backupccl/restore_online.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
Expand Down Expand Up @@ -650,3 +653,24 @@ func (r *restoreResumer) cleanupAfterDownload(
}
return nil
}

func createImportRollbackJob(
ctx context.Context,
jr *jobs.Registry,
txn isql.Txn,
username username.SQLUsername,
tableDesc *tabledesc.Mutable,
) error {
jobRecord := jobs.Record{
Description: fmt.Sprintf("ROLLBACK IMPORT INTO %s", tableDesc.GetName()),
Username: username,
NonCancelable: true,
DescriptorIDs: descpb.IDs{tableDesc.GetID()},
Details: jobspb.ImportRollbackDetails{
TableID: tableDesc.GetID(),
},
Progress: jobspb.ImportRollbackProgress{},
}
_, err := jr.CreateJobWithTxn(ctx, jobRecord, jr.MakeJobID(), txn)
return err
}
Original file line number Diff line number Diff line change
@@ -1,35 +1,53 @@
# This test ensures that online restore failes when restoring tables
# undergoing an in progress import
# This test ensures that online restore generates an IMPORT ROLLBACK
# job.

reset test-nodelocal
----

new-cluster name=s1 disable-tenant
----

exec-sql
CREATE DATABASE d;
USE d;
CREATE TABLE foo (i INT PRIMARY KEY, s STRING);
CREATE TABLE baz (i INT PRIMARY KEY, s STRING);
INSERT INTO baz VALUES (1, 'x'),(2,'y'),(3,'z');
INSERT INTO foo VALUES (4, 'a');
----

exec-sql
SET CLUSTER SETTING jobs.debug.pausepoints = 'import.after_ingest';
EXPORT INTO CSV 'nodelocal://1/export1/' FROM SELECT * FROM baz;
----
NOTICE: EXPORT is not the recommended way to move data out of CockroachDB and may be deprecated in the future. Please consider exporting data with changefeeds instead: https://www.cockroachlabs.com/docs/stable/export-data-with-changefeeds

exec-sql
SET CLUSTER SETTING jobs.debug.pausepoints = 'import.after_ingest';
----

# Pause the import job, in order to back up the importing data.
import expect-pausepoint tag=a
IMPORT INTO data.bank CSV DATA ('workload:///csv/bank/bank?rows=100&version=1.0.0')
IMPORT INTO foo (i,s) CSV DATA ('nodelocal://1/export1/export*-n*.0.csv')
----
job paused at pausepoint


exec-sql
BACKUP INTO 'nodelocal://1/cluster/';
----

exec-sql
RESTORE DATABASE d FROM LATEST IN 'nodelocal://1/cluster/' WITH EXPERIMENTAL DEFERRED COPY, new_db_name=d2;
----

new-cluster name=s2 share-io-dir=s1 allow-implicit-access disable-tenant
exec-sql
SHOW JOB WHEN COMPLETE (SELECT id FROM system.jobs WHERE job_type = 'IMPORT ROLLBACK')
----

query-sql
SELECT description, status FROM [SHOW JOBS (SELECT id FROM system.jobs WHERE job_type = 'IMPORT ROLLBACK')]
----
ROLLBACK IMPORT INTO foo succeeded

exec-sql
RESTORE DATABASE data FROM LATEST IN 'nodelocal://1/cluster/' with EXPERIMENTAL DEFERRED COPY;
query-sql
SELECT count(1) FROM d2.foo
----
pq: table bank (id 106) in restoring backup has an in-progress import, but online restore cannot be run on a table with an in progress import
1
16 changes: 16 additions & 0 deletions pkg/jobs/jobspb/jobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1263,6 +1263,19 @@ message MVCCStatisticsJobProgress {

}

message ImportRollbackDetails {
// TableID is the descriptor ID of table that should be rolled back.
//
// TODO(ssd): We could consider having this job process multiple
// tables.
uint32 table_id = 1 [
(gogoproto.customname) = "TableID",
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb.ID"
];
}

message ImportRollbackProgress {}

message Payload {
string description = 1;
// If empty, the description is assumed to be the statement.
Expand Down Expand Up @@ -1326,6 +1339,7 @@ message Payload {
AutoConfigTaskDetails auto_config_task = 43;
AutoUpdateSQLActivityDetails auto_update_sql_activities = 44;
MVCCStatisticsJobDetails mvcc_statistics_details = 45;
ImportRollbackDetails import_rollback_details = 46;
}
reserved 26;
// PauseReason is used to describe the reason that the job is currently paused
Expand Down Expand Up @@ -1400,6 +1414,7 @@ message Progress {
AutoConfigTaskProgress auto_config_task = 31;
AutoUpdateSQLActivityProgress update_sql_activity = 32;
MVCCStatisticsJobProgress mvcc_statistics_progress = 33;
ImportRollbackProgress import_rollback_progress = 34;
}

uint64 trace_id = 21 [(gogoproto.nullable) = false, (gogoproto.customname) = "TraceID", (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb.TraceID"];
Expand Down Expand Up @@ -1436,6 +1451,7 @@ enum Type {
AUTO_CONFIG_TASK = 22 [(gogoproto.enumvalue_customname) = "TypeAutoConfigTask"];
AUTO_UPDATE_SQL_ACTIVITY = 23 [(gogoproto.enumvalue_customname) = "TypeAutoUpdateSQLActivity"];
MVCC_STATISTICS_UPDATE = 24 [(gogoproto.enumvalue_customname) = "TypeMVCCStatisticsUpdate"];
IMPORT_ROLLBACK = 25 [(gogoproto.enumvalue_customname) = "TypeImportRollback"];
}

message Job {
Expand Down
15 changes: 14 additions & 1 deletion pkg/jobs/jobspb/wrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ var (
_ Details = AutoConfigTaskDetails{}
_ Details = AutoUpdateSQLActivityDetails{}
_ Details = MVCCStatisticsJobDetails{}
_ Details = ImportRollbackDetails{}
)

// ProgressDetails is a marker interface for job progress details proto structs.
Expand All @@ -71,6 +72,7 @@ var (
_ ProgressDetails = AutoConfigTaskProgress{}
_ ProgressDetails = AutoUpdateSQLActivityProgress{}
_ ProgressDetails = MVCCStatisticsJobProgress{}
_ ProgressDetails = ImportRollbackProgress{}
)

// Type returns the payload's job type and panics if the type is invalid.
Expand Down Expand Up @@ -213,6 +215,8 @@ func DetailsType(d isPayload_Details) (Type, error) {
return TypeAutoUpdateSQLActivity, nil
case *Payload_MvccStatisticsDetails:
return TypeMVCCStatisticsUpdate, nil
case *Payload_ImportRollbackDetails:
return TypeImportRollback, nil
default:
return TypeUnspecified, errors.Newf("Payload.Type called on a payload with an unknown details type: %T", d)
}
Expand Down Expand Up @@ -258,6 +262,7 @@ var JobDetailsForEveryJobType = map[Type]Details{
TypeAutoConfigTask: AutoConfigTaskDetails{},
TypeAutoUpdateSQLActivity: AutoUpdateSQLActivityDetails{},
TypeMVCCStatisticsUpdate: MVCCStatisticsJobDetails{},
TypeImportRollback: ImportRollbackDetails{},
}

// WrapProgressDetails wraps a ProgressDetails object in the protobuf wrapper
Expand Down Expand Up @@ -315,6 +320,8 @@ func WrapProgressDetails(details ProgressDetails) interface {
return &Progress_UpdateSqlActivity{UpdateSqlActivity: &d}
case MVCCStatisticsJobProgress:
return &Progress_MvccStatisticsProgress{MvccStatisticsProgress: &d}
case ImportRollbackProgress:
return &Progress_ImportRollbackProgress{ImportRollbackProgress: &d}
default:
panic(errors.AssertionFailedf("WrapProgressDetails: unknown progress type %T", d))
}
Expand Down Expand Up @@ -370,6 +377,8 @@ func (p *Payload) UnwrapDetails() Details {
return *d.AutoUpdateSqlActivities
case *Payload_MvccStatisticsDetails:
return *d.MvccStatisticsDetails
case *Payload_ImportRollbackDetails:
return *d.ImportRollbackDetails
default:
return nil
}
Expand Down Expand Up @@ -425,6 +434,8 @@ func (p *Progress) UnwrapDetails() ProgressDetails {
return *d.UpdateSqlActivity
case *Progress_MvccStatisticsProgress:
return *d.MvccStatisticsProgress
case *Progress_ImportRollbackProgress:
return *d.ImportRollbackProgress
default:
return nil
}
Expand Down Expand Up @@ -504,6 +515,8 @@ func WrapPayloadDetails(details Details) interface {
return &Payload_AutoUpdateSqlActivities{AutoUpdateSqlActivities: &d}
case MVCCStatisticsJobDetails:
return &Payload_MvccStatisticsDetails{MvccStatisticsDetails: &d}
case ImportRollbackDetails:
return &Payload_ImportRollbackDetails{ImportRollbackDetails: &d}
default:
panic(errors.AssertionFailedf("jobs.WrapPayloadDetails: unknown details type %T", d))
}
Expand Down Expand Up @@ -539,7 +552,7 @@ const (
func (Type) SafeValue() {}

// NumJobTypes is the number of jobs types.
const NumJobTypes = 25
const NumJobTypes = 26

// ChangefeedDetailsMarshaler allows for dependency injection of
// cloud.SanitizeExternalStorageURI to avoid the dependency from this
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ go_library(
"read_import_pgcopy.go",
"read_import_pgdump.go",
"read_import_workload.go",
"rollback_job.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/sql/importer",
visibility = ["//visibility:public"],
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/importer/import_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1261,7 +1261,7 @@ var importEpochs = settings.RegisterBoolSetting(
settings.ApplicationLevel,
"bulkio.import.write_import_epoch.enabled",
"controls whether IMPORT will write ImportEpoch's to descriptors",
false,
true,
)

func getFractionCompleted(job *jobs.Job) float64 {
Expand Down Expand Up @@ -1599,7 +1599,7 @@ func (r *importResumer) dropTables(
execCfg.Codec,
&execCfg.Settings.SV,
execCfg.DistSender,
intoTable,
intoTable.GetID(),
predicates, sql.RevertTableDefaultBatchSize); err != nil {
return errors.Wrap(err, "rolling back IMPORT INTO in non empty table via DeleteRange")
}
Expand Down
Loading
Loading