Skip to content

Commit

Permalink
sql,backupccl: create IMPORT ROLLBACK job in online restore
Browse files Browse the repository at this point in the history
Tables that are offline and have a non-zero ImportEpoch are now
restored to their offline state and an IMPORT ROLLBACK job is
synthesized.

The IMPORT ROLLBACK job uses an epoch-based predicate delete to
rollback the table and bring it back online.

Epic: none
Release note: None
  • Loading branch information
stevendanna committed Mar 19, 2024
1 parent f5cd323 commit 1615358
Show file tree
Hide file tree
Showing 10 changed files with 295 additions and 37 deletions.
73 changes: 54 additions & 19 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -812,18 +812,31 @@ func shouldPreRestore(table *tabledesc.Mutable) bool {

// backedUpDescriptorWithInProgressImportInto returns true if the backed up descriptor represents a table with an in
// progress import that started in a cluster finalized to version 22.2.
func backedUpDescriptorWithInProgressImportInto(
ctx context.Context, p sql.JobExecContext, desc catalog.Descriptor,
) (bool, error) {
func backedUpDescriptorWithInProgressImportInto(desc catalog.Descriptor) bool {
table, ok := desc.(catalog.TableDescriptor)
if !ok {
return false, nil
return false
}

if table.GetInProgressImportStartTime() == 0 {
return false, nil
return table.GetInProgressImportStartTime() > 0
}

func backedUpDescriptorWithImportEpoch(desc catalog.Descriptor) bool {
table, ok := desc.(catalog.TableDescriptor)
if !ok {
return false
}
return true, nil

return table.TableDesc().ImportEpoch > 0
}

func epochBasedinProgressImport(desc catalog.Descriptor) bool {
table, ok := desc.(catalog.TableDescriptor)
if !ok {
return false
}

return table.GetInProgressImportStartTime() > 0 && table.TableDesc().ImportEpoch > 0
}

// createImportingDescriptors creates the tables that we will restore into and returns up to three
Expand Down Expand Up @@ -886,22 +899,32 @@ func createImportingDescriptors(
for _, desc := range sqlDescs {
// Decide which offline tables to include in the restore:
//
// - An offline table created by RESTORE or IMPORT PGDUMP is fully discarded.
// The table will not exist in the restoring cluster.
// - An offline table created by RESTORE or IMPORT PGDUMP is
// fully discarded. The table will not exist in the restoring
// cluster.
//
// - An offline table undergoing an IMPORT INTO in traditional
// restore has all importing data elided in the restore
// processor and is restored online to its pre import state.
//
// - An offline table undergoing an IMPORT INTO has all importing data
// elided in the restore processor and is restored online to its pre import
// state.
// - An offline table undergoing an IMPORT INTO in online
// restore with no ImportEpoch cannot be restored and an error
// is returned.
//
// - An offline table undergoing an IMPORT INTO in online
// restore with an ImportEpoch is restored with an Offline
// table and a revert job is queued that will bring the table
// back online.
if desc.Offline() {
if schema, ok := desc.(catalog.SchemaDescriptor); ok {
offlineSchemas[schema.GetID()] = struct{}{}
}

if hasInProgressImportInto, err := backedUpDescriptorWithInProgressImportInto(ctx, p, desc); err != nil {
return nil, nil, nil, err
} else if hasInProgressImportInto && details.ExperimentalOnline {
return nil, nil, nil, errors.Newf("table %s (id %d) in restoring backup has an in-progress import, but online restore cannot be run on a table with an in progress import", desc.GetName(), desc.GetID())
} else if !hasInProgressImportInto {
if backedUpDescriptorWithInProgressImportInto(desc) {
if details.ExperimentalOnline && !backedUpDescriptorWithImportEpoch(desc) {
return nil, nil, nil, errors.Newf("table %s (id %d) in restoring backup has an in-progress import, but online restore cannot be run on a table with an in progress import", desc.GetName(), desc.GetID())
}
} else {
continue
}
}
Expand Down Expand Up @@ -2256,7 +2279,8 @@ func (r *restoreResumer) publishDescriptors(
// Write the new TableDescriptors and flip state over to public so they can be
// accessed.
for i := range details.TableDescs {
mutTable := all.LookupDescriptor(details.TableDescs[i].GetID()).(*tabledesc.Mutable)
desc := all.LookupDescriptor(details.TableDescs[i].GetID())
mutTable := desc.(*tabledesc.Mutable)

if details.ExperimentalOnline && mutTable.IsTable() {
// We disable automatic stats refresh on all restored tables until the
Expand Down Expand Up @@ -2310,6 +2334,13 @@ func (r *restoreResumer) publishDescriptors(
return err
}

if details.ExperimentalOnline && epochBasedinProgressImport(desc) {
if err := createImportRollbackJob(ctx,
r.execCfg.JobRegistry, txn, r.job.Payload().UsernameProto.Decode(), mutTable,
); err != nil {
return err
}
}
}
// For all of the newly created types, make type schema change jobs for any
// type descriptors that were backed up in the middle of a type schema change.
Expand Down Expand Up @@ -2340,7 +2371,11 @@ func (r *restoreResumer) publishDescriptors(
b := txn.KV().NewBatch()
if err := all.ForEachDescriptor(func(desc catalog.Descriptor) error {
d := desc.(catalog.MutableDescriptor)
d.SetPublic()
if details.ExperimentalOnline && epochBasedinProgressImport(desc) {
log.Infof(ctx, "table %q (%d) with in-progress IMPORT remaining offline", desc.GetName(), desc.GetID())
} else {
d.SetPublic()
}
return txn.Descriptors().WriteDescToBatch(ctx, kvTrace, d, b)
}); err != nil {
return err
Expand Down
23 changes: 23 additions & 0 deletions pkg/ccl/backupccl/restore_online.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
Expand Down Expand Up @@ -650,3 +653,23 @@ func (r *restoreResumer) cleanupAfterDownload(
}
return nil
}

func createImportRollbackJob(
ctx context.Context,
jr *jobs.Registry,
txn isql.Txn,
username username.SQLUsername,
tableDesc *tabledesc.Mutable,
) error {
jobRecord := jobs.Record{
Description: fmt.Sprintf("ROLLBACK IMPORT INTO %s", tableDesc.GetName()),
Username: username,
DescriptorIDs: descpb.IDs{tableDesc.GetID()},
Details: jobspb.ImportRollbackDetails{
TableID: tableDesc.GetID(),
},
Progress: jobspb.ImportRollbackProgress{},
}
_, err := jr.CreateJobWithTxn(ctx, jobRecord, jr.MakeJobID(), txn)
return err
}
Original file line number Diff line number Diff line change
@@ -1,35 +1,66 @@
# This test ensures that online restore failes when restoring tables
# undergoing an in progress import
# This test ensures that online restore generates an IMPORT ROLLBACK
# job.

reset test-nodelocal
----

new-cluster name=s1 disable-tenant
----

exec-sql
CREATE DATABASE d;
USE d;
CREATE TABLE foo (i INT PRIMARY KEY, s STRING);
CREATE TABLE baz (i INT PRIMARY KEY, s STRING);
INSERT INTO baz VALUES (1, 'x'),(2,'y'),(3,'z');
INSERT INTO foo VALUES (4, 'a');
----

exec-sql
SET CLUSTER SETTING jobs.debug.pausepoints = 'import.after_ingest';
EXPORT INTO CSV 'nodelocal://1/export1/' FROM SELECT * FROM baz;
----
NOTICE: EXPORT is not the recommended way to move data out of CockroachDB and may be deprecated in the future. Please consider exporting data with changefeeds instead: https://www.cockroachlabs.com/docs/stable/export-data-with-changefeeds



exec-sql
SET CLUSTER SETTING bulkio.backup.elide_common_prefix.enabled = false;
----

exec-sql
SET CLUSTER SETTING bulkio.import.write_import_epoch.enabled = true;
----

exec-sql
SET CLUSTER SETTING jobs.debug.pausepoints = 'import.after_ingest';
----

# Pause the import job, in order to back up the importing data.
import expect-pausepoint tag=a
IMPORT INTO data.bank CSV DATA ('workload:///csv/bank/bank?rows=100&version=1.0.0')
IMPORT INTO foo (i,s) CSV DATA ('nodelocal://1/export1/export*-n*.0.csv')
----
job paused at pausepoint


exec-sql
BACKUP INTO 'nodelocal://1/cluster/';
----


new-cluster name=s2 share-io-dir=s1 allow-implicit-access disable-tenant
new-cluster name=s2 allow-implicit-access disable-tenant
----

exec-sql
RESTORE DATABASE d FROM LATEST IN 'nodelocal://1/cluster/' WITH EXPERIMENTAL DEFERRED COPY;
----

exec-sql
RESTORE DATABASE data FROM LATEST IN 'nodelocal://1/cluster/' with EXPERIMENTAL DEFERRED COPY;
SHOW JOB WHEN COMPLETE (SELECT id FROM system.jobs WHERE job_type = 'IMPORT ROLLBACK')
----

query-sql
SELECT description, status FROM [SHOW JOBS (SELECT id FROM system.jobs WHERE job_type = 'IMPORT ROLLBACK')]
----
ROLLBACK IMPORT INTO foo succeeded

query-sql
SELECT count(1) FROM d.foo
----
pq: table bank (id 106) in restoring backup has an in-progress import, but online restore cannot be run on a table with an in progress import
1
16 changes: 16 additions & 0 deletions pkg/jobs/jobspb/jobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1263,6 +1263,19 @@ message MVCCStatisticsJobProgress {

}

message ImportRollbackDetails {
// TableID is the descriptor ID of table that should be rolled back.
//
// TODO(ssd): We could consider having this job process multiple
// tables.
uint32 table_id = 1 [
(gogoproto.customname) = "TableID",
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb.ID"
];
}

message ImportRollbackProgress {}

message Payload {
string description = 1;
// If empty, the description is assumed to be the statement.
Expand Down Expand Up @@ -1326,6 +1339,7 @@ message Payload {
AutoConfigTaskDetails auto_config_task = 43;
AutoUpdateSQLActivityDetails auto_update_sql_activities = 44;
MVCCStatisticsJobDetails mvcc_statistics_details = 45;
ImportRollbackDetails import_rollback_details = 46;
}
reserved 26;
// PauseReason is used to describe the reason that the job is currently paused
Expand Down Expand Up @@ -1400,6 +1414,7 @@ message Progress {
AutoConfigTaskProgress auto_config_task = 31;
AutoUpdateSQLActivityProgress update_sql_activity = 32;
MVCCStatisticsJobProgress mvcc_statistics_progress = 33;
ImportRollbackProgress import_rollback_progress = 34;
}

uint64 trace_id = 21 [(gogoproto.nullable) = false, (gogoproto.customname) = "TraceID", (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb.TraceID"];
Expand Down Expand Up @@ -1436,6 +1451,7 @@ enum Type {
AUTO_CONFIG_TASK = 22 [(gogoproto.enumvalue_customname) = "TypeAutoConfigTask"];
AUTO_UPDATE_SQL_ACTIVITY = 23 [(gogoproto.enumvalue_customname) = "TypeAutoUpdateSQLActivity"];
MVCC_STATISTICS_UPDATE = 24 [(gogoproto.enumvalue_customname) = "TypeMVCCStatisticsUpdate"];
IMPORT_ROLLBACK = 25 [(gogoproto.enumvalue_customname) = "TypeImportRollback"];
}

message Job {
Expand Down
15 changes: 14 additions & 1 deletion pkg/jobs/jobspb/wrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ var (
_ Details = AutoConfigTaskDetails{}
_ Details = AutoUpdateSQLActivityDetails{}
_ Details = MVCCStatisticsJobDetails{}
_ Details = ImportRollbackDetails{}
)

// ProgressDetails is a marker interface for job progress details proto structs.
Expand All @@ -71,6 +72,7 @@ var (
_ ProgressDetails = AutoConfigTaskProgress{}
_ ProgressDetails = AutoUpdateSQLActivityProgress{}
_ ProgressDetails = MVCCStatisticsJobProgress{}
_ ProgressDetails = ImportRollbackProgress{}
)

// Type returns the payload's job type and panics if the type is invalid.
Expand Down Expand Up @@ -213,6 +215,8 @@ func DetailsType(d isPayload_Details) (Type, error) {
return TypeAutoUpdateSQLActivity, nil
case *Payload_MvccStatisticsDetails:
return TypeMVCCStatisticsUpdate, nil
case *Payload_ImportRollbackDetails:
return TypeImportRollback, nil
default:
return TypeUnspecified, errors.Newf("Payload.Type called on a payload with an unknown details type: %T", d)
}
Expand Down Expand Up @@ -258,6 +262,7 @@ var JobDetailsForEveryJobType = map[Type]Details{
TypeAutoConfigTask: AutoConfigTaskDetails{},
TypeAutoUpdateSQLActivity: AutoUpdateSQLActivityDetails{},
TypeMVCCStatisticsUpdate: MVCCStatisticsJobDetails{},
TypeImportRollback: ImportRollbackDetails{},
}

// WrapProgressDetails wraps a ProgressDetails object in the protobuf wrapper
Expand Down Expand Up @@ -315,6 +320,8 @@ func WrapProgressDetails(details ProgressDetails) interface {
return &Progress_UpdateSqlActivity{UpdateSqlActivity: &d}
case MVCCStatisticsJobProgress:
return &Progress_MvccStatisticsProgress{MvccStatisticsProgress: &d}
case ImportRollbackProgress:
return &Progress_ImportRollbackProgress{ImportRollbackProgress: &d}
default:
panic(errors.AssertionFailedf("WrapProgressDetails: unknown progress type %T", d))
}
Expand Down Expand Up @@ -370,6 +377,8 @@ func (p *Payload) UnwrapDetails() Details {
return *d.AutoUpdateSqlActivities
case *Payload_MvccStatisticsDetails:
return *d.MvccStatisticsDetails
case *Payload_ImportRollbackDetails:
return *d.ImportRollbackDetails
default:
return nil
}
Expand Down Expand Up @@ -425,6 +434,8 @@ func (p *Progress) UnwrapDetails() ProgressDetails {
return *d.UpdateSqlActivity
case *Progress_MvccStatisticsProgress:
return *d.MvccStatisticsProgress
case *Progress_ImportRollbackProgress:
return *d.ImportRollbackProgress
default:
return nil
}
Expand Down Expand Up @@ -504,6 +515,8 @@ func WrapPayloadDetails(details Details) interface {
return &Payload_AutoUpdateSqlActivities{AutoUpdateSqlActivities: &d}
case MVCCStatisticsJobDetails:
return &Payload_MvccStatisticsDetails{MvccStatisticsDetails: &d}
case ImportRollbackDetails:
return &Payload_ImportRollbackDetails{ImportRollbackDetails: &d}
default:
panic(errors.AssertionFailedf("jobs.WrapPayloadDetails: unknown details type %T", d))
}
Expand Down Expand Up @@ -539,7 +552,7 @@ const (
func (Type) SafeValue() {}

// NumJobTypes is the number of jobs types.
const NumJobTypes = 25
const NumJobTypes = 26

// ChangefeedDetailsMarshaler allows for dependency injection of
// cloud.SanitizeExternalStorageURI to avoid the dependency from this
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ go_library(
"read_import_pgcopy.go",
"read_import_pgdump.go",
"read_import_workload.go",
"rollback_job.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/sql/importer",
visibility = ["//visibility:public"],
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/importer/import_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1261,7 +1261,7 @@ var importEpochs = settings.RegisterBoolSetting(
settings.ApplicationLevel,
"bulkio.import.write_import_epoch.enabled",
"controls whether IMPORT will write ImportEpoch's to descriptors",
false,
true,
)

func getFractionCompleted(job *jobs.Job) float64 {
Expand Down Expand Up @@ -1599,7 +1599,7 @@ func (r *importResumer) dropTables(
execCfg.Codec,
&execCfg.Settings.SV,
execCfg.DistSender,
intoTable,
intoTable.GetID(),
predicates, sql.RevertTableDefaultBatchSize); err != nil {
return errors.Wrap(err, "rolling back IMPORT INTO in non empty table via DeleteRange")
}
Expand Down
Loading

0 comments on commit 1615358

Please sign in to comment.