Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
96295: changefeedccl: Rename and remove CDC functions r=miretskiy a=miretskiy

This PR renames and removes CDC specific functions, while maintaining backward compatability.

* `cdc_is_delete()` function removed.  It is replaced
  with `event_op()` function which returns a string describing
  the type of event.  If the changefeed is running with `diff`
  option, then this function returns `insert`, `update`, or
  `delete`.  If changefeed is running without the `diff` option,
  we can't tell an update from insert, so this function returns
  `upsert` or `delete`.
* `cdc_mvcc_timestamp()` function removed.  This information can be accessed 
    via cockroach standard system column `crdb_internal_mvcc_timestamp`.  
    The same timestamp column is avaiable in the previous row state 
    `(cdc_prev).crdb_internal_mvcc_timestamp`
* `cdc_updated_timestamp()` function renamed as `event_schema_timestamp()`

Fixes #92482
Epic: CRDB-17161

Release note (enterprise change): Deprecate and replace some of the changefeed specific functions made available in preview mode in 22.2
release.   While these functions are deprecated, old changefeed
transformations should continue to function properly.
Customers are encouraged to closely monitor their changefeeds
created during upgrade.  While effort was made to maintain backward
compatibility, the updated changefeed transformation may produce
slightly different output (different column names, etc).

97041: parser: add `QUERY` to `bare_label_keyword` r=ZhouXing19 a=ZhouXing19

As revealed by #84309, some keywords not added to `bare_label_keywords` in `sql.y` make the `SELECT ... <keyword>` statement error out, which is not compatible with postgres. This commit is to add the `QUERY` keyword per a support ticket. We're not adding the whole `unreserved_keyword` list here as having some of them in `bare_label_keyword`, such as `DAY`, brings `reduce` errors.

Postgres:

```
postgres=# select substring('stringstringstring',1,10) query;
   query
------------
 stringstri
(1 row)
```

CRDB:

```
root@localhost:26257/defaultdb> select substring('stringstringstring',1,10)  query;
invalid syntax: statement ignored: at or near "query": syntax error
SQLSTATE: 42601
DETAIL: source SQL:
select substring('stringstringstring',1,10)  query
                                             ^
```

informs #84309

Release Note(bug fix): fixed the syntax error for `SELECT ... QUERY` (without AS) statement.

97134: storage: include version in benchmark fixture directory r=RaduBerinde a=RaduBerinde

This benchmark is sometimes failing in CI and I cannot reproduce locally. The error is consistent with a leftover fixture from an older version (which shouldn't happen because CI should be cleaning up the repo).

This change adds the version to the fixture name so that this kind of cross-version problem cannot occur. This is a good idea regardless of the CI issue.

Informs #97061

Release note: None
Epic: none

97149: sql: add missing STORING clause in system.privileges user ID migration r=rafiss a=andyyang890

This patch fixes a discrepancy between the bootstrap schema and user ID
migration for the system.privileges table.

Part of #87079

Release note: None

Co-authored-by: Yevgeniy Miretskiy <[email protected]>
Co-authored-by: Jane Xing <[email protected]>
Co-authored-by: Radu Berinde <[email protected]>
Co-authored-by: Andy Yang <[email protected]>
  • Loading branch information
5 people committed Feb 15, 2023
5 parents f886e7c + 5265347 + db011d3 + 204ad97 + 93f134a commit 3c4d3ef
Show file tree
Hide file tree
Showing 23 changed files with 298 additions and 137 deletions.
1 change: 1 addition & 0 deletions docs/generated/sql/bnf/stmt_block.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -3426,6 +3426,7 @@ bare_label_keywords ::=
| 'INVOKER'
| 'LEAKPROOF'
| 'PARALLEL'
| 'QUERY'
| 'RETURN'
| 'RETURNS'
| 'SECURITY'
Expand Down
45 changes: 34 additions & 11 deletions pkg/ccl/changefeedccl/cdceval/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
package cdceval

import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/errors"
)
Expand All @@ -23,18 +25,39 @@ func RewritePreviewExpression(oldExpr *tree.SelectClause) (*tree.SelectClause, e
default:
return true, expr, nil
case *tree.FuncExpr:
// Old style cdc_prev() function returned JSONb object.
// Replace this call with a call to row_to_json((cdc_prev).*)
if n, ok := t.Func.FunctionReference.(*tree.UnresolvedName); ok && n.Parts[0] == "cdc_prev" {
rowToJSON := &tree.FuncExpr{
Func: tree.ResolvableFunctionReference{
FunctionReference: tree.FunDefs["row_to_json"],
},
Exprs: tree.Exprs{
&tree.TupleStar{Expr: tree.NewUnresolvedName("cdc_prev")},
},
if n, ok := t.Func.FunctionReference.(*tree.UnresolvedName); ok {
switch n.Parts[0] {
case "cdc_prev":
// Old style cdc_prev() function returned JSONb object.
// Replace this call with a call to row_to_json((cdc_prev).*)
rowToJSON := &tree.FuncExpr{
Func: tree.ResolvableFunctionReference{
FunctionReference: tree.FunDefs["row_to_json"],
},
Exprs: tree.Exprs{
&tree.TupleStar{Expr: tree.NewUnresolvedName("cdc_prev")},
},
}
return true, rowToJSON, nil
case "cdc_is_delete":
// Old style cdc_is_delete returned boolean; use event_op instead.
newExpr, err := parser.ParseExpr("(event_op() = 'delete')")
if err != nil {
return false, expr, err
}
return true, newExpr, nil
case "cdc_mvcc_timestamp":
// Old cdc_mvcc_timestamp() function gets replaced with crdb_intenral_mvcc_timestamp column.
return true, tree.NewUnresolvedName(colinfo.MVCCTimestampColumnName), nil
case "cdc_updated_timestamp":
// Old cdc_updated_timestamp gets replaced with event_schema_timestamp.
newExpr := &tree.FuncExpr{
Func: tree.ResolvableFunctionReference{
FunctionReference: tree.NewUnresolvedName("changefeed_event_schema_timestamp"),
},
}
return true, newExpr, nil
}
return true, rowToJSON, nil
}
}
return true, expr, nil
Expand Down
12 changes: 12 additions & 0 deletions pkg/ccl/changefeedccl/cdceval/compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,18 @@ func TestCompatRewrite(t *testing.T) {
oldExpr: "SELECT foo.*, cdc_prev() FROM " + fooRef + " WHERE cdc_prev()->>'field' = 'blah'",
newExpr: "SELECT foo.*, row_to_json((cdc_prev).*) FROM " + fooRef + " WHERE (row_to_json((cdc_prev).*)->>'field') = 'blah'",
},
{
oldExpr: "SELECT foo.*, cdc_is_delete() FROM " + fooRef,
newExpr: "SELECT foo.*, (event_op() = 'delete') FROM " + fooRef,
},
{
oldExpr: "SELECT foo.*, cdc_is_delete() AS was_deleted FROM " + fooRef,
newExpr: "SELECT foo.*, (event_op() = 'delete') AS was_deleted FROM " + fooRef,
},
{
oldExpr: "SELECT foo.*, cdc_mvcc_timestamp() FROM " + fooRef,
newExpr: "SELECT foo.*, crdb_internal_mvcc_timestamp FROM " + fooRef,
},
} {
sc, err := ParseChangefeedExpression(tc.oldExpr)
require.NoError(t, err)
Expand Down
38 changes: 33 additions & 5 deletions pkg/ccl/changefeedccl/cdceval/expr_eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ type Evaluator struct {
execCfg *sql.ExecutorConfig
user username.SQLUsername
sessionData sessiondatapb.SessionData

familyEval map[descpb.FamilyID]*familyEvaluator
withDiff bool
familyEval map[descpb.FamilyID]*familyEvaluator
}

// familyEvaluator is a responsible for evaluating expressions in CDC
Expand Down Expand Up @@ -82,13 +82,15 @@ func NewEvaluator(
user username.SQLUsername,
sd sessiondatapb.SessionData,
statementTS hlc.Timestamp,
withDiff bool,
) *Evaluator {
return &Evaluator{
sc: sc,
execCfg: execCfg,
user: user,
sessionData: sd,
statementTS: statementTS,
withDiff: withDiff,
familyEval: make(map[descpb.FamilyID]*familyEvaluator, 1), // usually, just 1 family.
}
}
Expand All @@ -101,6 +103,7 @@ func newFamilyEvaluator(
user username.SQLUsername,
sd sessiondatapb.SessionData,
statementTS hlc.Timestamp,
withDiff bool,
) *familyEvaluator {
e := familyEvaluator{
targetFamilyID: targetFamilyID,
Expand All @@ -113,6 +116,8 @@ func newFamilyEvaluator(
rowCh: make(chan tree.Datums, 1),
}
e.rowEvalCtx.startTime = statementTS
e.rowEvalCtx.withDiff = withDiff

// Arrange to be notified when event does not match predicate.
predicateAsProjection(e.norm)

Expand Down Expand Up @@ -143,7 +148,9 @@ func (e *Evaluator) Eval(

fe, ok := e.familyEval[updatedRow.FamilyID]
if !ok {
fe = newFamilyEvaluator(e.sc, updatedRow.FamilyID, e.execCfg, e.user, e.sessionData, e.statementTS)
fe = newFamilyEvaluator(
e.sc, updatedRow.FamilyID, e.execCfg, e.user, e.sessionData, e.statementTS, e.withDiff,
)
e.familyEval[updatedRow.FamilyID] = fe
}

Expand Down Expand Up @@ -183,7 +190,7 @@ func (e *familyEvaluator) eval(
}

// Setup context.
if err := e.setupContextForRow(ctx, updatedRow); err != nil {
if err := e.setupContextForRow(ctx, updatedRow, prevRow); err != nil {
return cdcevent.Row{}, err
}

Expand Down Expand Up @@ -447,9 +454,28 @@ func (e *familyEvaluator) copyPrevRow(prev cdcevent.Row) error {

// setupContextForRow configures evaluation context with the provided row
// information.
func (e *familyEvaluator) setupContextForRow(ctx context.Context, updated cdcevent.Row) error {
func (e *familyEvaluator) setupContextForRow(
ctx context.Context, updated cdcevent.Row, prevRow cdcevent.Row,
) error {
e.rowEvalCtx.ctx = ctx
e.rowEvalCtx.updatedRow = updated

if updated.IsDeleted() {
e.rowEvalCtx.op = eventTypeDelete
} else {
// Insert or update.
if e.rowEvalCtx.withDiff {
if prevRow.IsInitialized() {
e.rowEvalCtx.op = eventTypeUpdate
} else {
e.rowEvalCtx.op = eventTypeInsert
}
} else {
// Without diff option we can't tell insert from update; so, use upsert.
e.rowEvalCtx.op = eventTypeUpsert
}
}

return nil
}

Expand Down Expand Up @@ -478,7 +504,9 @@ func (e *familyEvaluator) closeErr() error {
type rowEvalContext struct {
ctx context.Context
startTime hlc.Timestamp
withDiff bool
updatedRow cdcevent.Row
op tree.Datum
}

// cdcAnnotationAddr is the address used to store relevant information
Expand Down
22 changes: 11 additions & 11 deletions pkg/ccl/changefeedccl/cdceval/expr_eval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package cdceval
import (
"context"
"fmt"
"math/rand"
"sort"
"strconv"
"testing"
Expand All @@ -34,7 +35,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -145,15 +145,15 @@ $$`)
"INSERT INTO foo (a, b) VALUES (2, '2nd test')",
"DELETE FROM foo WHERE a=2 AND b='2nd test'",
},
stmt: "SELECT *, cdc_is_delete() FROM foo WHERE 'hello' != 'world'",
stmt: "SELECT *, event_op() = 'delete' AS deleted FROM foo WHERE 'hello' != 'world'",
expectMainFamily: []decodeExpectation{
{
keyValues: []string{"2nd test", "2"},
allValues: map[string]string{"a": "2", "b": "2nd test", "e": "inactive", "cdc_is_delete": "false"},
allValues: map[string]string{"a": "2", "b": "2nd test", "e": "inactive", "deleted": "false"},
},
{
keyValues: []string{"2nd test", "2"},
allValues: map[string]string{"a": "2", "b": "2nd test", "e": "NULL", "cdc_is_delete": "true"},
allValues: map[string]string{"a": "2", "b": "2nd test", "e": "NULL", "deleted": "true"},
},
},
},
Expand Down Expand Up @@ -356,7 +356,7 @@ $$`)
"INSERT INTO foo (a, b) VALUES (123, 'select_if')",
"DELETE FROM foo where a=123",
},
stmt: "SELECT IF(cdc_is_delete(),'deleted',a::string) AS conditional FROM foo",
stmt: "SELECT IF(event_op() = 'delete','deleted',a::string) AS conditional FROM foo",
expectMainFamily: []decodeExpectation{
{
keyValues: []string{"select_if", "123"},
Expand Down Expand Up @@ -388,7 +388,7 @@ $$`)
actions: []string{
"INSERT INTO foo (a, b, h) VALUES (1, 'hello', 'invisible')",
},
stmt: "SELECT a, tableoid, h FROM foo WHERE crdb_internal_mvcc_timestamp = cdc_mvcc_timestamp()",
stmt: "SELECT a, tableoid, h FROM foo WHERE crdb_internal_mvcc_timestamp > 0",
expectMainFamily: []decodeExpectation{
{
keyValues: []string{"hello", "1"},
Expand All @@ -404,7 +404,7 @@ $$`)
actions: []string{
"INSERT INTO foo (a, b, h) VALUES (1, 'hello', 'invisible')",
},
stmt: "SELECT a, tableoid, h FROM foo WHERE crdb_internal_mvcc_timestamp = cdc_mvcc_timestamp()",
stmt: "SELECT a, tableoid, h FROM foo WHERE crdb_internal_mvcc_timestamp > 0",
expectErr: `column "h" does not exist`,
},
{
Expand Down Expand Up @@ -496,7 +496,7 @@ $$`)
"SELECT x, 'only_some_deleted_values', x::string FROM s",
},
actions: []string{"DELETE FROM foo WHERE b='only_some_deleted_values'"},
stmt: `SELECT * FROM foo WHERE cdc_is_delete() AND (cdc_prev).a % 33 = 0`,
stmt: `SELECT * FROM foo WHERE event_op() = 'delete' AND (cdc_prev).a % 33 = 0`,
expectMainFamily: repeatExpectation(decodeExpectation{expectUnwatchedErr: true}, 100),
expectOnlyCFamily: func() (expectations []decodeExpectation) {
for i := 1; i <= 100; i++ {
Expand Down Expand Up @@ -722,10 +722,9 @@ func slurpValues(t *testing.T, r cdcevent.Row) map[string]string {
}

func randEncDatumPrimaryFamily(
t *testing.T, desc catalog.TableDescriptor,
t *testing.T, rng *rand.Rand, desc catalog.TableDescriptor,
) (row rowenc.EncDatumRow) {
t.Helper()
rng, _ := randutil.NewTestRand()

family, err := catalog.MustFindFamilyByID(desc, 0 /* id */)
require.NoError(t, err)
Expand Down Expand Up @@ -781,8 +780,9 @@ func newEvaluatorWithNormCheck(
return nil, err
}

const withDiff = true
return NewEvaluator(norm.SelectClause, execCfg, username.RootUserName(),
defaultDBSessionData, hlc.Timestamp{}), nil
defaultDBSessionData, hlc.Timestamp{}, withDiff), nil
}

var defaultDBSessionData = sessiondatapb.SessionData{
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/cdceval/func_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ $$`)
},
{
testName: "cdc name without schema",
fnName: tree.MakeUnresolvedName("cdc_mvcc_timestamp"),
fnName: tree.MakeUnresolvedName("changefeed_creation_timestamp"),
expectedSchema: "public",
},
{
testName: "uppercase cdc name without schema",
fnName: tree.MakeUnresolvedName("cdc_mVCC_timeStamp"),
fnName: tree.MakeUnresolvedName("changefeed_creATIon_TimeStamp"),
expectedSchema: "public",
},
{
Expand Down
41 changes: 18 additions & 23 deletions pkg/ccl/changefeedccl/cdceval/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,38 +90,26 @@ var cdcFunctions = map[string]*tree.ResolvedFunctionDefinition{
//"st_asgeojson",
//"st_estimatedextent",

// NB: even though some cdc functions appear to be stable (e.g. cdc_is_delete()),
// NB: even though some cdc functions appear to be stable (e.g. event_op()),
// we should not mark custom CDC functions as stable. Doing so will cause
// optimizer to (constant) fold this function during optimization step -- something
// we definitely don't want to do because we need to evaluate those functions
// for each event.
"cdc_is_delete": makeCDCBuiltIn(
"cdc_is_delete",
"event_op": makeCDCBuiltIn(
"event_op",
tree.Overload{
Types: tree.ParamTypes{},
ReturnType: tree.FixedReturnType(types.Bool),
ReturnType: tree.FixedReturnType(types.String),
Fn: func(ctx context.Context, evalCtx *eval.Context, datums tree.Datums) (tree.Datum, error) {
rowEvalCtx := rowEvalContextFromEvalContext(evalCtx)
if rowEvalCtx.updatedRow.IsDeleted() {
return tree.DBoolTrue, nil
}
return tree.DBoolFalse, nil
return rowEvalCtx.op, nil
},
Info: "Returns true if the event is a deletion",
Info: "Returns 'insert', 'update', 'upsert' or 'delete' to describe the type of the operation.",
Volatility: volatility.Volatile,
}),
"cdc_mvcc_timestamp": cdcTimestampBuiltin(
"cdc_mvcc_timestamp",
"Returns MVCC timestamp of the event",
volatility.Volatile,
types.Decimal,
func(rowEvalCtx *rowEvalContext) hlc.Timestamp {
return rowEvalCtx.updatedRow.MvccTimestamp
},
),
"cdc_updated_timestamp": cdcTimestampBuiltin(
"cdc_updated_timestamp",
"Returns schema timestamp of the event",
"event_schema_timestamp": cdcTimestampBuiltin(
"event_schema_timestamp",
"Returns schema timestamp of the event.",
volatility.Volatile,
types.Decimal,
func(rowEvalCtx *rowEvalContext) hlc.Timestamp {
Expand All @@ -130,7 +118,7 @@ var cdcFunctions = map[string]*tree.ResolvedFunctionDefinition{
),
"changefeed_creation_timestamp": cdcTimestampBuiltin(
"changefeed_creation_timestamp",
"Returns changefeed creation time",
"Returns changefeed creation time.",
volatility.Stable,
types.Decimal,
func(rowEvalCtx *rowEvalContext) hlc.Timestamp {
Expand All @@ -139,6 +127,13 @@ var cdcFunctions = map[string]*tree.ResolvedFunctionDefinition{
),
}

var (
eventTypeInsert = tree.NewDString("insert")
eventTypeUpdate = tree.NewDString("update")
eventTypeUpsert = tree.NewDString("upsert")
eventTypeDelete = tree.NewDString("delete")
)

const cdcFnCategory = "CDC builtin"

var cdcFnProps = &tree.FunctionProperties{
Expand Down Expand Up @@ -253,7 +248,7 @@ func init() {
case volatility.Stable:
// If the stable function is not on the white list,
// then it is blacklisted.
if _, whitelisted := cdcFunctions[fnDef.Name]; !whitelisted {
if _, allowed := cdcFunctions[fnDef.Name]; !allowed {
functionDenyList[fnDef.Name] = struct{}{}
}
}
Expand Down
Loading

0 comments on commit 3c4d3ef

Please sign in to comment.