Skip to content

Commit

Permalink
sql: add support for CTAS AS OF SYSTEM TIME
Browse files Browse the repository at this point in the history
Previously, running a statement of the form

    CREATE TABLE t AS SELECT ... FROM ... AS OF SYSTEM TIME x

was not supported.

Now, it is supported. The semantics are that the table creation happens
at the transaction timestamp, but the backfill that's performed to fetch
the data from the `SELECT` is performed at the user-specified timestamp
x.

This is useful for copying data from tables that are experiencing write
traffic. Reading the contended table's data at a historical timestamp
avoids contention on the CREATE TABLE AS.

Release note (sql change): CREATE TABLE AS SELECT ... FROM ... AS OF
SYSTEM TIME x is now supported.
  • Loading branch information
jordanlewis committed Nov 28, 2020
1 parent e31d60c commit 72aa85a
Show file tree
Hide file tree
Showing 10 changed files with 208 additions and 25 deletions.
52 changes: 47 additions & 5 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,25 +530,37 @@ func (ex *connExecutor) execStmtInOpenState(
// don't return any event unless an error happens.

if os.ImplicitTxn.Get() {
asOfTs, err := p.isAsOf(ctx, ast)
asOfTs, timestampType, err := p.isAsOf(ctx, ast)
if err != nil {
return makeErrEvent(err)
}
if asOfTs != nil {
p.semaCtx.AsOfTimestamp = asOfTs
p.extendedEvalCtx.SetTxnTimestamp(asOfTs.GoTime())
ex.state.setHistoricalTimestamp(ctx, *asOfTs)
switch timestampType {
case transactionTimestamp:
p.semaCtx.AsOfTimestamp = asOfTs
p.extendedEvalCtx.SetTxnTimestamp(asOfTs.GoTime())
ex.state.setHistoricalTimestamp(ctx, *asOfTs)
case backfillTimestamp:
p.semaCtx.AsOfTimestampForBackfill = asOfTs
}
}
} else {
// If we're in an explicit txn, we allow AOST but only if it matches with
// the transaction's timestamp. This is useful for running AOST statements
// using the InternalExecutor inside an external transaction; one might want
// to do that to force p.avoidCachedDescriptors to be set below.
ts, err := p.isAsOf(ctx, ast)
ts, timestampType, err := p.isAsOf(ctx, ast)
if err != nil {
return makeErrEvent(err)
}
if ts != nil {
if timestampType == backfillTimestamp {
// Can't handle this: we don't know how to do a CTAS with a historical
// read timestamp and a present write timestamp.
err = unimplemented.NewWithIssueDetailf(35712, "historical ctas in explicit txn",
"historical CREATE TABLE AS unsupported in explicit transaction")
return makeErrEvent(err)
}
if readTs := ex.state.getReadTimestamp(); *ts != readTs {
err = pgerror.Newf(pgcode.Syntax,
"inconsistent AS OF SYSTEM TIME timestamp; expected: %s", readTs)
Expand Down Expand Up @@ -619,6 +631,8 @@ func (ex *connExecutor) execStmtInOpenState(
p.stmt = stmt
p.cancelChecker = cancelchecker.NewCancelChecker(ctx)
p.autoCommit = os.ImplicitTxn.Get() && !ex.server.cfg.TestingKnobs.DisableAutoCommit

// Now actually execute the statement!
if err := ex.dispatchToExecutionEngine(ctx, p, res); err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -759,13 +773,39 @@ func (ex *connExecutor) dispatchToExecutionEngine(
ex.sessionTracing.TracePlanStart(ctx, stmt.AST.StatementTag())
ex.statsCollector.phaseTimes[plannerStartLogicalPlan] = timeutil.Now()

var originalTxn *kv.Txn
if planner.semaCtx.AsOfTimestampForBackfill != nil {
// If we've been tasked with backfilling a schema change operation at a
// particular system time, it's important that we do planning for the
// operation at the timestamp that we're expecting to perform the backfill
// at, in case the schema of the objects that we read have changed in
// between the present transaction timestamp and the user-defined backfill
// timestamp.
//
// Set the planner's transaction to a new historical transaction pinned at
// that timestamp. We'll restore it after planning.
historicalTxn := kv.NewTxnWithSteppingEnabled(ctx, ex.transitionCtx.db, ex.transitionCtx.nodeIDOrZero)
historicalTxn.SetFixedTimestamp(ctx, *planner.semaCtx.AsOfTimestampForBackfill)
originalTxn = planner.txn
planner.txn = historicalTxn
}
// Prepare the plan. Note, the error is processed below. Everything
// between here and there needs to happen even if there's an error.
//
// As a note about planning in a historical context (happens if we enter the
// stanza above due to an AOST backfill query), we don't ever expect a retry
// error to come out of planning.
err := ex.makeExecPlan(ctx, planner)
// We'll be closing the plan manually below after execution; this
// defer is a catch-all in case some other return path is taken.
defer planner.curPlan.close(ctx)

if originalTxn != nil {
// Reset the planner's transaction to the current-timestamp, original
// transaction.
planner.txn = originalTxn
}

if planner.autoCommit {
planner.curPlan.flags.Set(planFlagImplicitTxn)
}
Expand Down Expand Up @@ -852,7 +892,9 @@ func (ex *connExecutor) dispatchToExecutionEngine(
default:
planner.curPlan.flags.Set(planFlagNotDistributed)
}

ex.sessionTracing.TraceExecStart(ctx, "distributed")
// Dispatch the query to the execution engine.
stats, err := ex.execWithDistSQLEngine(
ctx, planner, stmt.AST.StatementType(), res, distributePlan.WillDistribute(), progAtomic,
)
Expand Down
9 changes: 8 additions & 1 deletion pkg/sql/conn_executor_prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/querycache"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
"github.com/cockroachdb/cockroach/pkg/util/fsm"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -234,11 +235,17 @@ func (ex *connExecutor) populatePrepared(
}
p.extendedEvalCtx.PrepareOnly = true

protoTS, err := p.isAsOf(ctx, stmt.AST)
protoTS, timestampType, err := p.isAsOf(ctx, stmt.AST)
if err != nil {
return 0, err
}
if protoTS != nil {
if timestampType != transactionTimestamp {
// Can't handle this: we don't know how to do a CTAS with a historical
// read timestamp and a present write timestamp.
return 0, unimplemented.NewWithIssueDetailf(35712, "historical prepared backfill",
"historical CREATE TABLE AS unsupported in explicit transaction")
}
p.semaCtx.AsOfTimestamp = protoTS
txn.SetFixedTimestamp(ctx, *protoTS)
}
Expand Down
15 changes: 14 additions & 1 deletion pkg/sql/create_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,11 @@ func (n *createTableNode) startExec(params runParams) error {
// newTableDescIfAs does it automatically).
asCols = asCols[:len(asCols)-1]
}
// Set creationTime to the AS OF SYSTEM TIME that was stored in our As clause
// if there was one.
if params.p.semaCtx.AsOfTimestampForBackfill != nil {
creationTime = *params.p.semaCtx.AsOfTimestampForBackfill
}

desc, err = newTableDescIfAs(params,
n.n, n.dbDesc.GetID(), schemaID, id, creationTime, asCols, privs, params.p.EvalContext())
Expand Down Expand Up @@ -387,6 +392,14 @@ func (n *createTableNode) startExec(params runParams) error {
// If we are in an explicit txn or the source has placeholders, we execute the
// CTAS query synchronously.
if n.n.As() && !params.p.ExtendedEvalContext().TxnImplicit {
// If we're doing an explicit transaction, we can't do a historical CTAS
// so we should bail out.
if params.p.semaCtx.AsOfTimestampForBackfill != nil {
// We shouldn't get here in normal operation, but we'll check just in
// case.
return errors.AssertionFailedf("CTAS AS OF timestamp set in explicit txn")
}

err = func() error {
// The data fill portion of CREATE AS must operate on a read snapshot,
// so that it doesn't end up observing its own writes.
Expand Down Expand Up @@ -1105,7 +1118,7 @@ func getFinalSourceQuery(source *tree.Select, evalCtx *tree.EvalContext) string
f.Close()

// Substitute placeholders with their values.
ctx := tree.NewFmtCtx(tree.FmtSerializable)
ctx := tree.NewFmtCtx(tree.FmtSerializable | tree.FmtSkipAsOfSystemTimeClauses)
ctx.SetPlaceholderFormat(func(ctx *tree.FmtCtx, placeholder *tree.Placeholder) {
d, err := placeholder.Eval(evalCtx)
if err != nil {
Expand Down
37 changes: 30 additions & 7 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1115,12 +1115,29 @@ func ParseHLC(s string) (hlc.Timestamp, error) {
return tree.DecimalToHLC(dec)
}

// asOfTimestampType is used during processing of AOST clauses: depending on the
// context of an AOST clause, we should either set the main user's transaction
// timestamp or use the timestamp for a historical backfill (in the case of
// CREATE TABLE AS and other such statements).
type asOfTimestampType int

const (
// transactionTimestamp indicates that the AOST clause should apply to the
// user's transaction.
transactionTimestamp asOfTimestampType = iota + 1
// backfillTimestamp indicates that the AOST clause should apply to a backfill
// operation.
backfillTimestamp
)

// isAsOf analyzes a statement to bypass the logic in newPlan(), since
// that requires the transaction to be started already. If the returned
// timestamp is not nil, it is the timestamp to which a transaction
// should be set. The statements that will be checked are Select,
// ShowTrace (of a Select statement), Scrub, Export, and CreateStats.
func (p *planner) isAsOf(ctx context.Context, stmt tree.Statement) (*hlc.Timestamp, error) {
func (p *planner) isAsOf(
ctx context.Context, stmt tree.Statement,
) (*hlc.Timestamp, asOfTimestampType, error) {
var asOf tree.AsOfClause
switch s := stmt.(type) {
case *tree.Select:
Expand All @@ -1133,32 +1150,38 @@ func (p *planner) isAsOf(ctx context.Context, stmt tree.Statement) (*hlc.Timesta

sc, ok := selStmt.(*tree.SelectClause)
if !ok {
return nil, nil
return nil, 0, nil
}
if sc.From.AsOf.Expr == nil {
return nil, nil
return nil, 0, nil
}

asOf = sc.From.AsOf
case *tree.Scrub:
if s.AsOf.Expr == nil {
return nil, nil
return nil, 0, nil
}
asOf = s.AsOf
case *tree.Export:
return p.isAsOf(ctx, s.Query)
case *tree.CreateStats:
if s.Options.AsOf.Expr == nil {
return nil, nil
return nil, 0, nil
}
asOf = s.Options.AsOf
case *tree.Explain:
return p.isAsOf(ctx, s.Statement)
case *tree.CreateTable:
if !s.As() {
return nil, 0, nil
}
ts, _, err := p.isAsOf(ctx, s.AsSource)
return ts, backfillTimestamp, err
default:
return nil, nil
return nil, 0, nil
}
ts, err := p.EvalAsOfTimestamp(ctx, asOf)
return &ts, err
return &ts, transactionTimestamp, err
}

// isSavepoint returns true if ast is a SAVEPOINT statement.
Expand Down
83 changes: 81 additions & 2 deletions pkg/sql/logictest/testdata/logic_test/create_as
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
let $ts
SELECT now()

statement ok
CREATE TABLE stock (item, quantity) AS VALUES ('cups', 10), ('plates', 15), ('forks', 30)

Expand Down Expand Up @@ -40,8 +43,8 @@ forks blue
forks red
forks green

statement error pq: AS OF SYSTEM TIME must be provided on a top-level statement
CREATE TABLE t AS SELECT * FROM stock AS OF SYSTEM TIME '2016-01-01'
statement error pgcode 42P01 relation "stock" does not exist
CREATE TABLE t AS SELECT * FROM stock AS OF SYSTEM TIME '$ts'

statement error pgcode 42601 CREATE TABLE specifies 3 column names, but data source has 2 columns
CREATE TABLE t2 (col1, col2, col3) AS SELECT * FROM stock
Expand Down Expand Up @@ -362,3 +365,79 @@ SELECT * FROM t
----
1 1 false
2 2 true

# Test CTAS as of system time.

# Sleep for a millisecond to guarantee that the statement below will have
# occurred at least a millisecond after the insert into the stock table.
statement ok
SELECT pg_sleep(0.001)

statement ok
CREATE TABLE stockcopy AS SELECT * FROM stock AS OF SYSTEM TIME '-1 ms'

statement count 3
SELECT * FROM stockcopy

statement ok
INSERT INTO stock VALUES ('spoons', 10)

# Run a CTAS at a timestamp before we inserted the new value, and make sure
# that the newly-created table does not contain the new value.
let $ts
SELECT crdb_internal.approximate_timestamp(crdb_internal_mvcc_timestamp) -
'1ms'::interval FROM stock WHERE item = 'spoons';

statement ok
CREATE TABLE stocknospoons AS SELECT * FROM stock AS OF SYSTEM TIME '$ts'

query I
SELECT count(*) FROM stocknospoons WHERE item = 'spoons'
----
0

# Make sure that testing after the timestamp produces a result that
# includes the new row.

let $ts
SELECT crdb_internal.approximate_timestamp(crdb_internal_mvcc_timestamp) +
'1ms'::interval FROM stock WHERE item = 'spoons';

statement ok
CREATE TABLE stockwithspoons AS SELECT * FROM stock AS OF SYSTEM TIME '$ts'

query I
SELECT count(*) FROM stockwithspoons WHERE item = 'spoons'
----
1

statement ok
ALTER TABLE stock ADD COLUMN newcol INT DEFAULT 1

statement ok
CREATE TABLE stockafterschemachange AS SELECT * FROM stock AS OF SYSTEM TIME '$ts'

query error column "newcol" does not exist
SELECT newcol FROM stockafterschemachange

query I
SELECT count(*) FROM stockafterschemachange WHERE item = 'spoons'
----
1

## Test that CTAS AOST doesn't mix with explicit txns.
statement ok
BEGIN

statement error unimplemented: historical CREATE TABLE AS unsupported in explicit transaction
CREATE TABLE willfail AS SELECT * FROM stock AS OF SYSTEM TIME '-1s'

statement ok
ROLLBACK

statement error syntax error
CREATE TABLE willfail (a INT) AS OF SYSTEM TIME '-1s'

statement error unimplemented: cannot specify AS OF SYSTEM TIME with different timestamps
CREATE TABLE willfail AS SELECT *, (SELECT count(1) FROM STOCK AS OF SYSTEM TIME '-2s')
FROM stock AS OF SYSTEM TIME '-1s'
2 changes: 1 addition & 1 deletion pkg/sql/logictest/testdata/logic_test/views
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ statement ok
CREATE VIEW v AS SELECT d, t FROM t

statement error pq: AS OF SYSTEM TIME must be provided on a top-level statement
CREATE TABLE t2 AS SELECT d, t FROM t AS OF SYSTEM TIME '2017-02-13 21:30:00'
INSERT INTO t SELECT d, t FROM t AS OF SYSTEM TIME '2017-02-13 21:30:00'

statement ok
DROP TABLE t CASCADE
Expand Down
17 changes: 11 additions & 6 deletions pkg/sql/opt/optbuilder/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -1299,15 +1299,20 @@ func (b *Builder) validateAsOf(asOf tree.AsOfClause) {
panic(err)
}

if b.semaCtx.AsOfTimestamp == nil {
if b.semaCtx.AsOfTimestamp != nil {
if *b.semaCtx.AsOfTimestamp != ts {
panic(unimplementedWithIssueDetailf(35712, "",
"cannot specify AS OF SYSTEM TIME with different timestamps"))
}
} else if b.semaCtx.AsOfTimestampForBackfill != nil {
if *b.semaCtx.AsOfTimestampForBackfill != ts {
panic(unimplementedWithIssueDetailf(35712, "",
"cannot specify AS OF SYSTEM TIME with different timestamps"))
}
} else {
panic(pgerror.Newf(pgcode.Syntax,
"AS OF SYSTEM TIME must be provided on a top-level statement"))
}

if *b.semaCtx.AsOfTimestamp != ts {
panic(unimplementedWithIssueDetailf(35712, "",
"cannot specify AS OF SYSTEM TIME with different timestamps"))
}
}

// validateLockingInFrom checks for operations that are not supported with FOR
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/sem/tree/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ const (
// rather than string literals. For example, the bytes \x40 will be formatted
// as b'\x40' rather than '\x40'.
fmtFormatByteLiterals

// FmtSkipAsOfSystemTimeClauses prevents the formatter from printing AS OF
// SYSTEM TIME clauses.
FmtSkipAsOfSystemTimeClauses
)

// Composite/derived flag definitions follow.
Expand Down
6 changes: 4 additions & 2 deletions pkg/sql/sem/tree/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,10 @@ type AsOfClause struct {

// Format implements the NodeFormatter interface.
func (a *AsOfClause) Format(ctx *FmtCtx) {
ctx.WriteString("AS OF SYSTEM TIME ")
ctx.FormatNode(a.Expr)
if !ctx.flags.HasFlags(FmtSkipAsOfSystemTimeClauses) {
ctx.WriteString("AS OF SYSTEM TIME ")
ctx.FormatNode(a.Expr)
}
}

// From represents a FROM clause.
Expand Down
Loading

0 comments on commit 72aa85a

Please sign in to comment.