Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

importer: add ImportStartWallTime table descriptor field #85852

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 39 additions & 1 deletion pkg/ccl/backupccl/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
gosql "database/sql"
"fmt"
"net/url"
"regexp"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -240,9 +241,12 @@ func (d *datadrivenTestState) getSQLDB(t *testing.T, server string, user string)
// + ignore-notice: does not print out the notice that is buffered during
// query execution.
//
// - "query-sql [server=<name>] [user=<name>]"
// - "query-sql [server=<name>] [user=<name>] [regex=<regex pattern>]"
// Executes the input SQL query and print the results.
//
// + regex: return true if the query result matches the regex pattern and
// false otherwise.
//
// - "reset"
// Clear all state associated with the test.
//
Expand Down Expand Up @@ -472,6 +476,16 @@ func TestDataDriven(t *testing.T) {
}
output, err := sqlutils.RowsToDataDrivenOutput(rows)
require.NoError(t, err)
if d.HasArg("regex") {
var pattern string
d.ScanArgs(t, "regex", &pattern)
matched, err := regexp.MatchString(pattern, output)
require.NoError(t, err)
if matched {
return "true"
}
return "false"
}
return output

case "let":
Expand Down Expand Up @@ -525,6 +539,30 @@ func TestDataDriven(t *testing.T) {
require.NoError(t, err)
return ""

case "import":
server := lastCreatedServer
user := "root"
jobType := "IMPORT"

// First, run the backup.
_, err := ds.getSQLDB(t, server, user).Exec(d.Input)

// Tag the job.
if d.HasArg("tag") {
tagJob(t, server, user, jobType, ds, d)
}

// Check if we expect a pausepoint error.
if d.HasArg("expect-pausepoint") {
expectPausepoint(t, err, jobType, server, user, ds)
ret := append(ds.noticeBuffer, "job paused at pausepoint")
return strings.Join(ret, "\n")
}

// All other errors are bad.
require.NoError(t, err)
return ""

case "restore":
server := lastCreatedServer
user := "root"
Expand Down
106 changes: 106 additions & 0 deletions pkg/ccl/backupccl/testdata/backup-restore/import-start-time
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# Test that Import properly sets and removes the ImportStartTime from the table descriptor.
dt marked this conversation as resolved.
Show resolved Hide resolved
# The basic idea:
# For table with or without data:
## -start and pause an import
## -check that the ImportStartTime is set on the descriptor
## -check that it's removed after cancellation / success

new-server name=s1
----

exec-sql
CREATE DATABASE d;
USE d;
CREATE TABLE foo (i INT PRIMARY KEY, s STRING);
CREATE TABLE baz (i INT PRIMARY KEY, s STRING);
INSERT INTO baz VALUES (1, 'x'),(2,'y'),(3,'z');
----


exec-sql
CREATE VIEW import_time (importStartTime)
AS WITH tbls AS (
SELECT id, crdb_internal.pb_to_json('cockroach.sql.sqlbase.Descriptor', descriptor) AS orig FROM system.descriptor
)
SELECT orig->'table'->'importStartWallTime' FROM tbls
INNER JOIN (SELECT id FROM system.namespace WHERE name='foo') AS sys
ON sys.id = tbls.id;
----

exec-sql
EXPORT INTO CSV 'nodelocal://0/export1/' FROM SELECT * FROM baz WHERE i = 1;
----

exec-sql
SET CLUSTER SETTING jobs.debug.pausepoints = 'import.after_ingest';
----

import expect-pausepoint tag=a
IMPORT INTO foo (i,s) CSV DATA ('nodelocal://0/export1/export*-n*.0.csv')
----
job paused at pausepoint


query-sql regex=^"\d.
SELECT * FROM import_time
----
true

# attempting another import on the table should fail, as there's already an in-progress import
# on the table.
exec-sql
IMPORT INTO foo (i,s) CSV DATA ('nodelocal://0/export1/export*-n*.0.csv')
----
pq: relation "foo" is offline: importing

# Cancel the job so that the cleanup hook runs, and ensure the importStartTime is 0.
job cancel=a
----

query-sql
SELECT * FROM import_time
----
<nil>

# remove the pause setting, and try the import again and ensure it succeeds.
exec-sql
SET CLUSTER SETTING jobs.debug.pausepoints = '';
----

exec-sql
IMPORT INTO foo (i,s) CSV DATA ('nodelocal://0/export1/export*-n*.0.csv')
----

query-sql
SELECT * FROM import_time
----
<nil>


# ensure importing into an existing table also modifies the descriptor properly
exec-sql
EXPORT INTO CSV 'nodelocal://0/export2/' FROM SELECT * FROM baz WHERE i = 2;
----

exec-sql
SET CLUSTER SETTING jobs.debug.pausepoints = 'import.after_ingest';
----

import expect-pausepoint tag=b
IMPORT INTO foo (i,s) CSV DATA ('nodelocal://0/export2/export*-n*.0.csv')
----
job paused at pausepoint

query-sql regex=^"\d.
SELECT * FROM import_time
----
true

# Cancel the job so that the cleanup hook runs.
job cancel=b
----

query-sql
SELECT * FROM import_time
----
<nil>
12 changes: 6 additions & 6 deletions pkg/cli/testdata/doctor/test_recreate_zipdir

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion pkg/sql/catalog/descpb/structured.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1180,7 +1180,11 @@ message TableDescriptor {
// this table, in which case the global setting is used.
optional bool forecast_stats = 52 [(gogoproto.nullable) = true, (gogoproto.customname) = "ForecastStats"];

// Next ID: 54
// ImportStartWallTime contains the start wall time of an in-progress import.
// This field is non zero if this table is offline during an import.
optional int64 import_start_wall_time = 54 [(gogoproto.nullable) = false, (gogoproto.customname) = "ImportStartWallTime"];

// Next ID: 55
}

// SurvivalGoal is the survival goal for a database.
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/catalog/descriptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,9 @@ type TableDescriptor interface {
// IsRefreshViewRequired indicates if a REFRESH VIEW operation needs to be called
// on a materialized view.
IsRefreshViewRequired() bool
// GetInProgressImportStartTime returns the start wall time of the in progress import,
// if it exists.
GetInProgressImportStartTime() int64
}

// MutableTableDescriptor is both a MutableDescriptor and a TableDescriptor.
Expand Down
19 changes: 19 additions & 0 deletions pkg/sql/catalog/tabledesc/table_desc.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,20 @@ func (desc *Mutable) SetPublicNonPrimaryIndex(indexOrdinal int, index descpb.Ind
desc.Indexes[indexOrdinal-1] = index
}

// InitializeImport binds the import start time to the table descriptor
func (desc *Mutable) InitializeImport(startWallTime int64) error {
if desc.ImportStartWallTime != 0 {
return errors.AssertionFailedf("Import in progress with start time %v", desc.ImportStartWallTime)
}
desc.ImportStartWallTime = startWallTime
return nil
}

// FinalizeImport removes the ImportStartTime
func (desc *Mutable) FinalizeImport() {
desc.ImportStartWallTime = 0
}

// UpdateIndexPartitioning applies the new partition and adjusts the column info
// for the specified index descriptor. Returns false iff this was a no-op.
func UpdateIndexPartitioning(
Expand Down Expand Up @@ -660,3 +674,8 @@ func (desc *wrapper) GetObjectType() privilege.ObjectType {
}
return privilege.Table
}

// GetInProgressImportStartTime returns the start wall time of the import if there's one in progress
func (desc *wrapper) GetInProgressImportStartTime() int64 {
return desc.ImportStartWallTime
}
1 change: 1 addition & 0 deletions pkg/sql/catalog/tabledesc/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ var validationMap = []struct {
"DeclarativeSchemaChangerState": {status: iSolemnlySwearThisFieldIsValidated},
"AutoStatsSettings": {status: iSolemnlySwearThisFieldIsValidated},
"ForecastStats": {status: thisFieldReferencesNoObjects},
"ImportStartWallTime": {status: thisFieldReferencesNoObjects},
},
},
{
Expand Down
43 changes: 40 additions & 3 deletions pkg/sql/importer/import_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/gcjob"
"github.com/cockroachdb/cockroach/pkg/sql/opt/memo"
"github.com/cockroachdb/cockroach/pkg/sql/sem/catid"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
Expand Down Expand Up @@ -269,6 +270,14 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error {
return errors.Wrap(err, "checking if existing table is empty")
}
details.Tables[i].WasEmpty = len(res) == 0
if p.ExecCfg().Settings.Version.IsActive(ctx, clusterversion.V22_1) {
// Update the descriptor in the job record and in the database with the ImportStartTime
details.Tables[i].Desc.ImportStartWallTime = details.Walltime
err := bindImportStartTime(ctx, p, tblDesc.GetID(), details.Walltime)
if err != nil {
return err
}
}
}
}

Expand Down Expand Up @@ -388,7 +397,8 @@ func (r *importResumer) prepareTablesForIngestion(
return importDetails, err
}
importDetails.Tables[i] = jobspb.ImportDetails_Table{
Desc: desc, Name: table.Name,
Desc: desc,
Name: table.Name,
SeqVal: table.SeqVal,
IsNew: table.IsNew,
TargetCols: table.TargetCols,
Expand Down Expand Up @@ -452,10 +462,10 @@ func (r *importResumer) prepareTablesForIngestion(
// wait for all nodes to see the same descriptor version before doing so.
if !hasExistingTables {
importDetails.Walltime = p.ExecCfg().Clock.Now().WallTime
// TODO(msbutler): add import start time to IMPORT PGDUMP/MYSQL descriptor
} else {
importDetails.Walltime = 0
}

return importDetails, nil
}

Expand Down Expand Up @@ -680,6 +690,32 @@ func (r *importResumer) prepareSchemasForIngestion(
return schemaMetadata, err
}

// bindImportStarTime writes the ImportStarTime to the descriptor.
func bindImportStartTime(
ctx context.Context, p sql.JobExecContext, id catid.DescID, startWallTime int64,
) error {
if err := sql.DescsTxn(ctx, p.ExecCfg(), func(
ctx context.Context, txn *kv.Txn, descsCol *descs.Collection,
) error {
mutableDesc, err := descsCol.GetMutableTableVersionByID(ctx, id, txn)
if err != nil {
return err
}
if err := mutableDesc.InitializeImport(startWallTime); err != nil {
return err
}
if err := descsCol.WriteDesc(
ctx, false /* kvTrace */, mutableDesc, txn,
); err != nil {
return err
}
return nil
}); err != nil {
return err
}
return nil
}

// createSchemaDescriptorWithID writes a schema descriptor with `id` to disk.
func createSchemaDescriptorWithID(
ctx context.Context,
Expand Down Expand Up @@ -983,7 +1019,7 @@ func (r *importResumer) publishTables(
c.Validity = descpb.ConstraintValidity_Unvalidated
}
}

newTableDesc.FinalizeImport()
// TODO(dt): re-validate any FKs?
if err := descsCol.WriteDescToBatch(
ctx, false /* kvTrace */, newTableDesc, b,
Expand Down Expand Up @@ -1554,6 +1590,7 @@ func (r *importResumer) dropTables(
return err
}
intoDesc.SetPublic()
intoDesc.FinalizeImport()
const kvTrace = false
if err := descsCol.WriteDescToBatch(ctx, kvTrace, intoDesc, b); err != nil {
return err
Expand Down
Loading