Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

importer: add ImportEpoch table descriptor field #85692

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -283,4 +283,4 @@ trace.jaeger.agent string the address of a Jaeger agent to receive traces using
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https://<ui>/#/debug/tracez
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 22.1-40 set the active cluster version in the format '<major>.<minor>'
version version 22.1-42 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,6 @@
<tr><td><code>trace.opentelemetry.collector</code></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.</td></tr>
<tr><td><code>trace.span_registry.enabled</code></td><td>boolean</td><td><code>true</code></td><td>if set, ongoing traces can be seen at https://<ui>/#/debug/tracez</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>22.1-40</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>22.1-42</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
24 changes: 24 additions & 0 deletions pkg/ccl/backupccl/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,30 @@ func TestDataDriven(t *testing.T) {
require.NoError(t, err)
return ""

case "import":
server := lastCreatedServer
user := "root"
jobType := "IMPORT"

// First, run the backup.
_, err := ds.getSQLDB(t, server, user).Exec(d.Input)

// Tag the job.
if d.HasArg("tag") {
tagJob(t, server, user, jobType, ds, d)
}

// Check if we expect a pausepoint error.
if d.HasArg("expect-pausepoint") {
expectPausepoint(t, err, jobType, server, user, ds)
ret := append(ds.noticeBuffer, "job paused at pausepoint")
return strings.Join(ret, "\n")
}

// All other errors are bad.
require.NoError(t, err)
return ""

case "restore":
server := lastCreatedServer
user := "root"
Expand Down
81 changes: 81 additions & 0 deletions pkg/ccl/backupccl/testdata/backup-restore/import-epoch
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Test that Import INTO properly increments the importEpoch descriptor field only when the user runs
# an IMPORT INTO on a non-empty table.

new-server name=s1
----

exec-sql
CREATE DATABASE d;
USE d;
CREATE TABLE foo (i INT PRIMARY KEY, s STRING);
CREATE TABLE baz (i INT PRIMARY KEY, s STRING);
INSERT INTO baz VALUES (1, 'x'),(2,'y'),(3,'z');
----

exec-sql
CREATE VIEW import_epoch (epoch, type)
AS WITH tbls AS (
SELECT id, crdb_internal.pb_to_json('cockroach.sql.sqlbase.Descriptor', descriptor) AS orig FROM system.descriptor
)
SELECT orig->'table'->'importEpoch', orig->'table'->'importTypeInProgress' FROM tbls WHERE id = '109';
----

exec-sql
EXPORT INTO CSV 'nodelocal://0/export1/' FROM SELECT * FROM baz WHERE i = 1;
----

exec-sql
IMPORT INTO foo (i,s) CSV DATA ('nodelocal://0/export1/export*-n*.0.csv')
----

query-sql
SELECT name, id FROM system.namespace WHERE name = 'foo';
----
foo 109

query-sql
SELECT * FROM import_epoch
----
1 <nil>

exec-sql
EXPORT INTO CSV 'nodelocal://0/export2/' FROM SELECT * FROM baz WHERE i = 2;
----

exec-sql
IMPORT INTO foo (i,s) CSV DATA ('nodelocal://0/export2/export*-n*.0.csv')
----

query-sql
SELECT * FROM import_epoch
----
2 <nil>

exec-sql
SET CLUSTER SETTING jobs.debug.pausepoints = 'import.after_ingest';
----

exec-sql
EXPORT INTO CSV 'nodelocal://0/export3/' FROM SELECT * FROM baz WHERE i = 3;
----

# ensure the ImportEpoch increments before planning and does not rollback after the IMPORT INTO
# job gets cancelled
import expect-pausepoint tag=a
IMPORT INTO foo (i,s) CSV DATA ('nodelocal://0/export3/export*-n*.0.csv')
----
job paused at pausepoint

query-sql
SELECT * FROM import_epoch
----
3 "IMPORT_INTO_NON_EMPTY"

# Cancel the job so that the cleanup hook runs.
job cancel=a
----

query-sql
SELECT * FROM import_epoch
----
3 <nil>
12 changes: 6 additions & 6 deletions pkg/cli/testdata/doctor/test_recreate_zipdir

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,8 @@ const (
UsersHaveIDs
// SetUserIDNotNull sets the user_id column in system.users to not null.
SetUserIDNotNull
// ImportRollbacksWithoutMVCC allows IMPORT to rollback imports without MVCC timestamps
ImportRollbacksWithoutMVCC

// *************************************************
// Step (1): Add new versions here.
Expand Down Expand Up @@ -590,6 +592,10 @@ var versionsSingleton = keyedVersions{
Key: SetUserIDNotNull,
Version: roachpb.Version{Major: 22, Minor: 1, Internal: 40},
},
{
Key: ImportRollbacksWithoutMVCC,
Version: roachpb.Version{Major: 22, Minor: 1, Internal: 42},
},
// *************************************************
// Step (2): Add new versions here.
// Do not add new versions to a patch release.
Expand Down
5 changes: 3 additions & 2 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 17 additions & 1 deletion pkg/sql/catalog/descpb/structured.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1180,7 +1180,23 @@ message TableDescriptor {
// this table, in which case the global setting is used.
optional bool forecast_stats = 52 [(gogoproto.nullable) = true, (gogoproto.customname) = "ForecastStats"];

// Next ID: 54
// ImportIntoEpoch is the count of IMPORT INTO jobs that have been attempted on this
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// ImportIntoEpoch is the count of IMPORT INTO jobs that have been attempted on this
// ImportEpoch is the count of IMPORT INTO jobs that have been attempted on this

// table once it __already had data__. The ImportEpoch can include an in progress import, as the
// field gets incremented while preparing the table for ingestion.
optional uint32 import_epoch = 54 [(gogoproto.nullable) = false, (gogoproto.customname) = "ImportEpoch"];

// ImportType indicates the kind of import currently occurring on the table.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could explain the differences. Why is IMPORT a separate thing?

enum ImportType {
NO_IMPORT = 0;
IMPORT = 1;
IMPORT_INTO_EMPTY = 2;
IMPORT_INTO_NON_EMPTY = 3;
}

// ImportTypeInProgress describes the type of in-progress import, if any.
optional ImportType import_type_in_progress = 55 [(gogoproto.nullable) = false, (gogoproto.customname) = "ImportTypeInProgress"];

// Next ID: 56
}

// SurvivalGoal is the survival goal for a database.
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/catalog/descriptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,9 @@ type TableDescriptor interface {
// IsRefreshViewRequired indicates if a REFRESH VIEW operation needs to be called
// on a materialized view.
IsRefreshViewRequired() bool
// GetInProgressImportEpoch gets the current import epoch and import type if
// there's an in progress import.
GetInProgressImportEpoch() (uint32, descpb.TableDescriptor_ImportType)
}

// MutableTableDescriptor is both a MutableDescriptor and a TableDescriptor.
Expand Down
24 changes: 24 additions & 0 deletions pkg/sql/catalog/tabledesc/table_desc.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ var _ catalog.TableDescriptor = (*wrapper)(nil)
// descriptors at this cluster version.
const ConstraintIDsAddedToTableDescsVersion = clusterversion.RemoveIncompatibleDatabasePrivileges

// OfflineReasonImporting hard codes the Offline Reason for Importing Tables
const OfflineReasonImporting = "importing"

// wrapper is the base implementation of the catalog.Descriptor
// interface, which is overloaded by immutable and Mutable.
type wrapper struct {
Expand Down Expand Up @@ -185,6 +188,18 @@ func (desc *Mutable) SetPublicNonPrimaryIndex(indexOrdinal int, index descpb.Ind
desc.Indexes[indexOrdinal-1] = index
}

// InitializeImport increments the import epoch and sets
// ImportTypeInProgress enum, signalling a new import job is occurring.
func (desc *Mutable) InitializeImport(importType descpb.TableDescriptor_ImportType) {
desc.ImportEpoch++
desc.ImportTypeInProgress = importType
}

// FinalizeImport flips ImportTypeInProgress to NO_IMPORT.
func (desc *Mutable) FinalizeImport() {
desc.ImportTypeInProgress = descpb.TableDescriptor_NO_IMPORT
}

// UpdateIndexPartitioning applies the new partition and adjusts the column info
// for the specified index descriptor. Returns false iff this was a no-op.
func UpdateIndexPartitioning(
Expand Down Expand Up @@ -654,3 +669,12 @@ func (desc *wrapper) GetObjectType() privilege.ObjectType {
}
return privilege.Table
}

// GetInProgressImportEpoch returns the ImportEpoch and ImportType of the descriptor if there's
// an in-progress import.
func (desc *wrapper) GetInProgressImportEpoch() (uint32, descpb.TableDescriptor_ImportType) {
if desc.ImportTypeInProgress != 0 {
return desc.ImportEpoch, desc.ImportTypeInProgress
}
return 0, 0
}
2 changes: 2 additions & 0 deletions pkg/sql/catalog/tabledesc/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ var validationMap = []struct {
"DeclarativeSchemaChangerState": {status: iSolemnlySwearThisFieldIsValidated},
"AutoStatsSettings": {status: iSolemnlySwearThisFieldIsValidated},
"ForecastStats": {status: thisFieldReferencesNoObjects},
"ImportEpoch": {status: thisFieldReferencesNoObjects},
"ImportTypeInProgress": {status: thisFieldReferencesNoObjects},
},
},
{
Expand Down
53 changes: 49 additions & 4 deletions pkg/sql/importer/import_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/gcjob"
"github.com/cockroachdb/cockroach/pkg/sql/opt/memo"
"github.com/cockroachdb/cockroach/pkg/sql/sem/catid"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
Expand Down Expand Up @@ -267,6 +268,17 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error {
return errors.Wrap(err, "checking if existing table is empty")
}
details.Tables[i].WasEmpty = len(res) == 0
if p.ExecCfg().Settings.Version.IsActive(ctx, clusterversion.ImportRollbacksWithoutMVCC) {
importType := descpb.TableDescriptor_IMPORT_INTO_NON_EMPTY
if details.Tables[i].WasEmpty {
importType = descpb.TableDescriptor_IMPORT_INTO_EMPTY
}
importEpoch, err := incrementImportEpoch(ctx, p, tblDesc.GetID(), importType)
if err != nil {
return err
}
details.Tables[i].Desc.ImportEpoch = importEpoch
}
}
}

Expand Down Expand Up @@ -386,7 +398,8 @@ func (r *importResumer) prepareTablesForIngestion(
return importDetails, err
}
importDetails.Tables[i] = jobspb.ImportDetails_Table{
Desc: desc, Name: table.Name,
Desc: desc,
Name: table.Name,
SeqVal: table.SeqVal,
IsNew: table.IsNew,
TargetCols: table.TargetCols,
Expand Down Expand Up @@ -480,7 +493,7 @@ func prepareExistingTablesForIngestion(
// Take the table offline for import.
// TODO(dt): audit everywhere we get table descs (leases or otherwise) to
// ensure that filtering by state handles IMPORTING correctly.
importing.SetOffline("importing")
importing.SetOffline(tabledesc.OfflineReasonImporting)

// TODO(dt): de-validate all the FKs.
if err := descsCol.WriteDesc(
Expand Down Expand Up @@ -564,7 +577,10 @@ func prepareNewTablesForIngestion(
// as tabledesc.TableDescriptor.
tableDescs := make([]catalog.TableDescriptor, len(newMutableTableDescriptors))
for i := range tableDescs {
newMutableTableDescriptors[i].SetOffline("importing")
newMutableTableDescriptors[i].SetOffline(tabledesc.OfflineReasonImporting)
if p.ExecCfg().Settings.Version.IsActive(ctx, clusterversion.ImportRollbacksWithoutMVCC) {
newMutableTableDescriptors[i].InitializeImport(descpb.TableDescriptor_IMPORT)
}
tableDescs[i] = newMutableTableDescriptors[i]
}

Expand Down Expand Up @@ -678,6 +694,34 @@ func (r *importResumer) prepareSchemasForIngestion(
return schemaMetadata, err
}

func incrementImportEpoch(
ctx context.Context,
p sql.JobExecContext,
id catid.DescID,
importType descpb.TableDescriptor_ImportType,
) (uint32, error) {
var importEpoch uint32
if err := sql.DescsTxn(ctx, p.ExecCfg(), func(
ctx context.Context, txn *kv.Txn, descsCol *descs.Collection,
) error {
mutableDesc, err := descsCol.GetMutableTableVersionByID(ctx, id, txn)
if err != nil {
return err
}
mutableDesc.InitializeImport(importType)
if err := descsCol.WriteDesc(
ctx, false /* kvTrace */, mutableDesc, txn,
); err != nil {
return err
}
importEpoch = mutableDesc.ImportEpoch
return nil
}); err != nil {
return 0, err
}
return importEpoch, nil
}

// createSchemaDescriptorWithID writes a schema descriptor with `id` to disk.
func createSchemaDescriptorWithID(
ctx context.Context,
Expand Down Expand Up @@ -981,7 +1025,7 @@ func (r *importResumer) publishTables(
c.Validity = descpb.ConstraintValidity_Unvalidated
}
}

newTableDesc.FinalizeImport()
// TODO(dt): re-validate any FKs?
if err := descsCol.WriteDescToBatch(
ctx, false /* kvTrace */, newTableDesc, b,
Expand Down Expand Up @@ -1526,6 +1570,7 @@ func (r *importResumer) dropTables(
return err
}
intoDesc.SetPublic()
intoDesc.FinalizeImport()
const kvTrace = false
if err := descsCol.WriteDescToBatch(ctx, kvTrace, intoDesc, b); err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/importer/read_import_pgdump.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func createPostgresSchemas(
if err != nil {
return nil, err
}
desc.SetOffline("importing")
desc.SetOffline(tabledesc.OfflineReasonImporting)
return desc, nil
}
var schemaDescs []*schemadesc.Mutable
Expand Down