Skip to content

Commit

Permalink
importer: add ImportStartWallTime descriptor
Browse files Browse the repository at this point in the history
This patch introduces the new ImportStartWallTime table descriptor field which
provides the time at which an in-progress import began writing data to disk.
This field is nonzero if the table is offline during an import.

In future PRs, this field will be used to:
- incrementally back up in progress imports, preventing a large BACKUP workload
 after an Import finishes.
- elide importing keys in RESTORE, ensuring a table -- with an in-progress import
  in the back up-- contains no in-progress importing keys and is available on
  the restored cluster.

Informs #85138

Release note: none
  • Loading branch information
msbutler committed Aug 12, 2022
1 parent ae2c3d6 commit 8722d84
Show file tree
Hide file tree
Showing 8 changed files with 188 additions and 18 deletions.
106 changes: 106 additions & 0 deletions pkg/ccl/backupccl/testdata/backup-restore/import-start-time
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# Test that Import properly sets and removes the ImportStartTime from the table descriptor.
# The basic idea:
# For table with or without data:
## -start and pause an import
## -check that the ImportStartTime is set on the descriptor
## -check that it's removed after cancellation / success

new-server name=s1
----

exec-sql
CREATE DATABASE d;
USE d;
CREATE TABLE foo (i INT PRIMARY KEY, s STRING);
CREATE TABLE baz (i INT PRIMARY KEY, s STRING);
INSERT INTO baz VALUES (1, 'x'),(2,'y'),(3,'z');
----


exec-sql
CREATE VIEW import_time (importStartTime)
AS WITH tbls AS (
SELECT id, crdb_internal.pb_to_json('cockroach.sql.sqlbase.Descriptor', descriptor) AS orig FROM system.descriptor
)
SELECT orig->'table'->'importStartWallTime' FROM tbls
INNER JOIN (SELECT id FROM system.namespace WHERE name='foo') AS sys
ON sys.id = tbls.id;
----

exec-sql
EXPORT INTO CSV 'nodelocal://0/export1/' FROM SELECT * FROM baz WHERE i = 1;
----

exec-sql
SET CLUSTER SETTING jobs.debug.pausepoints = 'import.after_ingest';
----

import expect-pausepoint tag=a
IMPORT INTO foo (i,s) CSV DATA ('nodelocal://0/export1/export*-n*.0.csv')
----
job paused at pausepoint


query-sql regex=^"\d.
SELECT * FROM import_time
----
true

# attempting another import on the table should fail, as there's already an in-progress import
# on the table.
exec-sql
IMPORT INTO foo (i,s) CSV DATA ('nodelocal://0/export1/export*-n*.0.csv')
----
pq: relation "foo" is offline: importing

# Cancel the job so that the cleanup hook runs, and ensure the importStartTime is 0.
job cancel=a
----

query-sql
SELECT * FROM import_time
----
<nil>

# remove the pause setting, and try the import again and ensure it succeeds.
exec-sql
SET CLUSTER SETTING jobs.debug.pausepoints = '';
----

exec-sql
IMPORT INTO foo (i,s) CSV DATA ('nodelocal://0/export1/export*-n*.0.csv')
----

query-sql
SELECT * FROM import_time
----
<nil>


# ensure importing into an existing table also modifies the descriptor properly
exec-sql
EXPORT INTO CSV 'nodelocal://0/export2/' FROM SELECT * FROM baz WHERE i = 2;
----

exec-sql
SET CLUSTER SETTING jobs.debug.pausepoints = 'import.after_ingest';
----

import expect-pausepoint tag=b
IMPORT INTO foo (i,s) CSV DATA ('nodelocal://0/export2/export*-n*.0.csv')
----
job paused at pausepoint

query-sql regex=^"\d.
SELECT * FROM import_time
----
true

# Cancel the job so that the cleanup hook runs.
job cancel=b
----

query-sql
SELECT * FROM import_time
----
<nil>
12 changes: 6 additions & 6 deletions pkg/cli/testdata/doctor/test_recreate_zipdir

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion pkg/sql/catalog/descpb/structured.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1180,7 +1180,11 @@ message TableDescriptor {
// this table, in which case the global setting is used.
optional bool forecast_stats = 52 [(gogoproto.nullable) = true, (gogoproto.customname) = "ForecastStats"];

// Next ID: 54
// ImportStartWallTime contains the start wall time of an in-progress import.
// This field is non zero if this table is offline during an import.
optional int64 import_start_wall_time = 54 [(gogoproto.nullable) = false, (gogoproto.customname) = "ImportStartWallTime"];

// Next ID: 55
}

// SurvivalGoal is the survival goal for a database.
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/catalog/descriptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,9 @@ type TableDescriptor interface {
// IsRefreshViewRequired indicates if a REFRESH VIEW operation needs to be called
// on a materialized view.
IsRefreshViewRequired() bool
// GetInProgressImportStartTime returns the start wall time of the in progress import,
// if it exists.
GetInProgressImportStartTime() int64
}

// MutableTableDescriptor is both a MutableDescriptor and a TableDescriptor.
Expand Down
19 changes: 19 additions & 0 deletions pkg/sql/catalog/tabledesc/table_desc.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,20 @@ func (desc *Mutable) SetPublicNonPrimaryIndex(indexOrdinal int, index descpb.Ind
desc.Indexes[indexOrdinal-1] = index
}

// InitializeImport binds the import start time to the table descriptor
func (desc *Mutable) InitializeImport(startWallTime int64) error {
if desc.ImportStartWallTime != 0 {
return errors.AssertionFailedf("Import in progress with start time %v", desc.ImportStartWallTime)
}
desc.ImportStartWallTime = startWallTime
return nil
}

// FinalizeImport removes the ImportStartTime
func (desc *Mutable) FinalizeImport() {
desc.ImportStartWallTime = 0
}

// UpdateIndexPartitioning applies the new partition and adjusts the column info
// for the specified index descriptor. Returns false iff this was a no-op.
func UpdateIndexPartitioning(
Expand Down Expand Up @@ -660,3 +674,8 @@ func (desc *wrapper) GetObjectType() privilege.ObjectType {
}
return privilege.Table
}

// GetInProgressImportStartTime returns the start wall time of the import if there's one in progress
func (desc *wrapper) GetInProgressImportStartTime() int64 {
return desc.ImportStartWallTime
}
1 change: 1 addition & 0 deletions pkg/sql/catalog/tabledesc/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ var validationMap = []struct {
"DeclarativeSchemaChangerState": {status: iSolemnlySwearThisFieldIsValidated},
"AutoStatsSettings": {status: iSolemnlySwearThisFieldIsValidated},
"ForecastStats": {status: thisFieldReferencesNoObjects},
"ImportStartWalltime": {status: thisFieldReferencesNoObjects},
},
},
{
Expand Down
43 changes: 40 additions & 3 deletions pkg/sql/importer/import_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/gcjob"
"github.com/cockroachdb/cockroach/pkg/sql/opt/memo"
"github.com/cockroachdb/cockroach/pkg/sql/sem/catid"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
Expand Down Expand Up @@ -269,6 +270,14 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error {
return errors.Wrap(err, "checking if existing table is empty")
}
details.Tables[i].WasEmpty = len(res) == 0
if p.ExecCfg().Settings.Version.IsActive(ctx, clusterversion.V22_1) {
// Update the descriptor in the job record and in the database with the ImportStartTime
details.Tables[i].Desc.ImportStartWallTime = details.Walltime
err := bindImportStartTime(ctx, p, tblDesc.GetID(), details.Walltime)
if err != nil {
return err
}
}
}
}

Expand Down Expand Up @@ -388,7 +397,8 @@ func (r *importResumer) prepareTablesForIngestion(
return importDetails, err
}
importDetails.Tables[i] = jobspb.ImportDetails_Table{
Desc: desc, Name: table.Name,
Desc: desc,
Name: table.Name,
SeqVal: table.SeqVal,
IsNew: table.IsNew,
TargetCols: table.TargetCols,
Expand Down Expand Up @@ -452,10 +462,10 @@ func (r *importResumer) prepareTablesForIngestion(
// wait for all nodes to see the same descriptor version before doing so.
if !hasExistingTables {
importDetails.Walltime = p.ExecCfg().Clock.Now().WallTime
// TODO(msbutler) add import start time to IMPORT PGDUMP/MYSQL descriptor
} else {
importDetails.Walltime = 0
}

return importDetails, nil
}

Expand Down Expand Up @@ -680,6 +690,32 @@ func (r *importResumer) prepareSchemasForIngestion(
return schemaMetadata, err
}

// bindImportStarTime writes the ImportStarTime to the descriptor.
func bindImportStartTime(
ctx context.Context, p sql.JobExecContext, id catid.DescID, startWallTime int64,
) error {
if err := sql.DescsTxn(ctx, p.ExecCfg(), func(
ctx context.Context, txn *kv.Txn, descsCol *descs.Collection,
) error {
mutableDesc, err := descsCol.GetMutableTableVersionByID(ctx, id, txn)
if err != nil {
return err
}
if err := mutableDesc.InitializeImport(startWallTime); err != nil {
return err
}
if err := descsCol.WriteDesc(
ctx, false /* kvTrace */, mutableDesc, txn,
); err != nil {
return err
}
return nil
}); err != nil {
return err
}
return nil
}

// createSchemaDescriptorWithID writes a schema descriptor with `id` to disk.
func createSchemaDescriptorWithID(
ctx context.Context,
Expand Down Expand Up @@ -983,7 +1019,7 @@ func (r *importResumer) publishTables(
c.Validity = descpb.ConstraintValidity_Unvalidated
}
}

newTableDesc.FinalizeImport()
// TODO(dt): re-validate any FKs?
if err := descsCol.WriteDescToBatch(
ctx, false /* kvTrace */, newTableDesc, b,
Expand Down Expand Up @@ -1554,6 +1590,7 @@ func (r *importResumer) dropTables(
return err
}
intoDesc.SetPublic()
intoDesc.FinalizeImport()
const kvTrace = false
if err := descsCol.WriteDescToBatch(ctx, kvTrace, intoDesc, b); err != nil {
return err
Expand Down
Loading

0 comments on commit 8722d84

Please sign in to comment.