Skip to content

Commit

Permalink
changefeedccl: Rename and remove CDC functions
Browse files Browse the repository at this point in the history
This PR renames and removes CDC specific functions,
while maintaining backward compatability.

* `cdc_is_delete()` function removed.  It is replaced
  with `event_op()` function which returns `insert`, `update`
  or `delete` "op" correspnding to the operation that was
  performed to generate changefeed event.
* `cdc_mvcc_timestamp()` function removed.  This information
  can be accessed via cockroach standard system column
  `crdb_internal_mvcc_timestamp`.  The same timestamp column
  is avaiable in the previous row state
  `(cdc_prev).crdb_internal_mvcc_timestamp`
* `cdc_updated_timestamp()` function renamed as
  `event_schema_timestamp()`

Fixes #92482
Epic: CRDB-17161

Release note (enterprise change): Deprecate and replace some of the
changefeed specific functions made available in preview mode in 22.2
release.   While these functions are deprecated, old changefeed
transformations should continue to function properly.
Customers are encouraged to closely monitor their changefeeds
created during upgrade.  While effort was made to maintain backward
compatibility, the updated changefeed transformation may produce
slightly different output (different column names, etc).
  • Loading branch information
Yevgeniy Miretskiy committed Jan 31, 2023
1 parent 8ae413b commit 36e6b88
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 52 deletions.
45 changes: 34 additions & 11 deletions pkg/ccl/changefeedccl/cdceval/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
package cdceval

import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/errors"
)
Expand All @@ -23,18 +25,39 @@ func RewritePreviewExpression(oldExpr *tree.SelectClause) (*tree.SelectClause, e
default:
return true, expr, nil
case *tree.FuncExpr:
// Old style cdc_prev() function returned JSONb object.
// Replace this call with a call to row_to_json((cdc_prev).*)
if n, ok := t.Func.FunctionReference.(*tree.UnresolvedName); ok && n.Parts[0] == "cdc_prev" {
rowToJSON := &tree.FuncExpr{
Func: tree.ResolvableFunctionReference{
FunctionReference: tree.FunDefs["row_to_json"],
},
Exprs: tree.Exprs{
&tree.TupleStar{Expr: tree.NewUnresolvedName("cdc_prev")},
},
if n, ok := t.Func.FunctionReference.(*tree.UnresolvedName); ok {
switch n.Parts[0] {
case "cdc_prev":
// Old style cdc_prev() function returned JSONb object.
// Replace this call with a call to row_to_json((cdc_prev).*)
rowToJSON := &tree.FuncExpr{
Func: tree.ResolvableFunctionReference{
FunctionReference: tree.FunDefs["row_to_json"],
},
Exprs: tree.Exprs{
&tree.TupleStar{Expr: tree.NewUnresolvedName("cdc_prev")},
},
}
return true, rowToJSON, nil
case "cdc_is_delete":
// Old style cdc_is_delete returned boolean; use event_op instead.
newExpr, err := parser.ParseExpr("(event_op() = 'delete')")
if err != nil {
return false, expr, err
}
return true, newExpr, nil
case "cdc_mvcc_timestamp":
// Old cdc_mvcc_timestamp() function gets replaced with crdb_intenral_mvcc_timestamp column.
return true, tree.NewUnresolvedName(colinfo.MVCCTimestampColumnName), nil
case "cdc_updated_timestamp":
// Old cdc_updated_timestamp gets replaced with event_schema_timestamp.
newExpr := &tree.FuncExpr{
Func: tree.ResolvableFunctionReference{
FunctionReference: tree.NewUnresolvedName("changefeed_event_schema_timestamp"),
},
}
return true, newExpr, nil
}
return true, rowToJSON, nil
}
}
return true, expr, nil
Expand Down
12 changes: 12 additions & 0 deletions pkg/ccl/changefeedccl/cdceval/compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,18 @@ func TestCompatRewrite(t *testing.T) {
oldExpr: "SELECT foo.*, cdc_prev() FROM " + fooRef + " WHERE cdc_prev()->>'field' = 'blah'",
newExpr: "SELECT foo.*, row_to_json((cdc_prev).*) FROM " + fooRef + " WHERE (row_to_json((cdc_prev).*)->>'field') = 'blah'",
},
{
oldExpr: "SELECT foo.*, cdc_is_delete() FROM " + fooRef,
newExpr: "SELECT foo.*, (event_op() = 'delete') FROM " + fooRef,
},
{
oldExpr: "SELECT foo.*, cdc_is_delete() AS was_deleted FROM " + fooRef,
newExpr: "SELECT foo.*, (event_op() = 'delete') AS was_deleted FROM " + fooRef,
},
{
oldExpr: "SELECT foo.*, cdc_mvcc_timestamp() FROM " + fooRef,
newExpr: "SELECT foo.*, crdb_internal_mvcc_timestamp FROM " + fooRef,
},
} {
sc, err := ParseChangefeedExpression(tc.oldExpr)
require.NoError(t, err)
Expand Down
8 changes: 6 additions & 2 deletions pkg/ccl/changefeedccl/cdceval/expr_eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (e *familyEvaluator) eval(
}

// Setup context.
if err := e.setupContextForRow(ctx, updatedRow); err != nil {
if err := e.setupContextForRow(ctx, updatedRow, prevRow.IsDeleted()); err != nil {
return cdcevent.Row{}, err
}

Expand Down Expand Up @@ -447,9 +447,12 @@ func (e *familyEvaluator) copyPrevRow(prev cdcevent.Row) error {

// setupContextForRow configures evaluation context with the provided row
// information.
func (e *familyEvaluator) setupContextForRow(ctx context.Context, updated cdcevent.Row) error {
func (e *familyEvaluator) setupContextForRow(
ctx context.Context, updated cdcevent.Row, isNew bool,
) error {
e.rowEvalCtx.ctx = ctx
e.rowEvalCtx.updatedRow = updated
e.rowEvalCtx.isNewRow = isNew
return nil
}

Expand Down Expand Up @@ -479,6 +482,7 @@ type rowEvalContext struct {
ctx context.Context
startTime hlc.Timestamp
updatedRow cdcevent.Row
isNewRow bool
}

// cdcAnnotationAddr is the address used to store relevant information
Expand Down
14 changes: 7 additions & 7 deletions pkg/ccl/changefeedccl/cdceval/expr_eval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,15 @@ $$`)
"INSERT INTO foo (a, b) VALUES (2, '2nd test')",
"DELETE FROM foo WHERE a=2 AND b='2nd test'",
},
stmt: "SELECT *, cdc_is_delete() FROM foo WHERE 'hello' != 'world'",
stmt: "SELECT *, event_op() = 'delete' AS deleted FROM foo WHERE 'hello' != 'world'",
expectMainFamily: []decodeExpectation{
{
keyValues: []string{"2nd test", "2"},
allValues: map[string]string{"a": "2", "b": "2nd test", "e": "inactive", "cdc_is_delete": "false"},
allValues: map[string]string{"a": "2", "b": "2nd test", "e": "inactive", "deleted": "false"},
},
{
keyValues: []string{"2nd test", "2"},
allValues: map[string]string{"a": "2", "b": "2nd test", "e": "NULL", "cdc_is_delete": "true"},
allValues: map[string]string{"a": "2", "b": "2nd test", "e": "NULL", "deleted": "true"},
},
},
},
Expand Down Expand Up @@ -356,7 +356,7 @@ $$`)
"INSERT INTO foo (a, b) VALUES (123, 'select_if')",
"DELETE FROM foo where a=123",
},
stmt: "SELECT IF(cdc_is_delete(),'deleted',a::string) AS conditional FROM foo",
stmt: "SELECT IF(event_op() = 'delete','deleted',a::string) AS conditional FROM foo",
expectMainFamily: []decodeExpectation{
{
keyValues: []string{"select_if", "123"},
Expand Down Expand Up @@ -388,7 +388,7 @@ $$`)
actions: []string{
"INSERT INTO foo (a, b, h) VALUES (1, 'hello', 'invisible')",
},
stmt: "SELECT a, tableoid, h FROM foo WHERE crdb_internal_mvcc_timestamp = cdc_mvcc_timestamp()",
stmt: "SELECT a, tableoid, h FROM foo WHERE crdb_internal_mvcc_timestamp > 0",
expectMainFamily: []decodeExpectation{
{
keyValues: []string{"hello", "1"},
Expand All @@ -404,7 +404,7 @@ $$`)
actions: []string{
"INSERT INTO foo (a, b, h) VALUES (1, 'hello', 'invisible')",
},
stmt: "SELECT a, tableoid, h FROM foo WHERE crdb_internal_mvcc_timestamp = cdc_mvcc_timestamp()",
stmt: "SELECT a, tableoid, h FROM foo WHERE crdb_internal_mvcc_timestamp > 0",
expectErr: `column "h" does not exist`,
},
{
Expand Down Expand Up @@ -496,7 +496,7 @@ $$`)
"SELECT x, 'only_some_deleted_values', x::string FROM s",
},
actions: []string{"DELETE FROM foo WHERE b='only_some_deleted_values'"},
stmt: `SELECT * FROM foo WHERE cdc_is_delete() AND (cdc_prev).a % 33 = 0`,
stmt: `SELECT * FROM foo WHERE event_op() = 'delete' AND (cdc_prev).a % 33 = 0`,
expectMainFamily: repeatExpectation(decodeExpectation{expectUnwatchedErr: true}, 100),
expectOnlyCFamily: func() (expectations []decodeExpectation) {
for i := 1; i <= 100; i++ {
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/cdceval/func_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ $$`)
},
{
testName: "cdc name without schema",
fnName: tree.MakeUnresolvedName("cdc_mvcc_timestamp"),
fnName: tree.MakeUnresolvedName("changefeed_creation_timestamp"),
expectedSchema: "public",
},
{
testName: "uppercase cdc name without schema",
fnName: tree.MakeUnresolvedName("cdc_mVCC_timeStamp"),
fnName: tree.MakeUnresolvedName("changefeed_creATIon_TimeStamp"),
expectedSchema: "public",
},
{
Expand Down
42 changes: 21 additions & 21 deletions pkg/ccl/changefeedccl/cdceval/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,38 +90,32 @@ var cdcFunctions = map[string]*tree.ResolvedFunctionDefinition{
//"st_asgeojson",
//"st_estimatedextent",

// NB: even though some cdc functions appear to be stable (e.g. cdc_is_delete()),
// NB: even though some cdc functions appear to be stable (e.g. event_op()),
// we should not mark custom CDC functions as stable. Doing so will cause
// optimizer to (constant) fold this function during optimization step -- something
// we definitely don't want to do because we need to evaluate those functions
// for each event.
"cdc_is_delete": makeCDCBuiltIn(
"cdc_is_delete",
"event_op": makeCDCBuiltIn(
"event_op",
tree.Overload{
Types: tree.ParamTypes{},
ReturnType: tree.FixedReturnType(types.Bool),
ReturnType: tree.FixedReturnType(types.String),
Fn: func(ctx context.Context, evalCtx *eval.Context, datums tree.Datums) (tree.Datum, error) {
rowEvalCtx := rowEvalContextFromEvalContext(evalCtx)
if rowEvalCtx.updatedRow.IsDeleted() {
return tree.DBoolTrue, nil
return eventTypeDelete, nil
}
return tree.DBoolFalse, nil
if rowEvalCtx.isNewRow {
return eventTypeInsert, nil
}
return eventTypeUpdate, nil
},
Info: "Returns true if the event is a deletion",
Info: "Returns 'insert', 'update', or 'delete' to describe the type of the operation.",
Volatility: volatility.Volatile,
}),
"cdc_mvcc_timestamp": cdcTimestampBuiltin(
"cdc_mvcc_timestamp",
"Returns MVCC timestamp of the event",
volatility.Volatile,
types.Decimal,
func(rowEvalCtx *rowEvalContext) hlc.Timestamp {
return rowEvalCtx.updatedRow.MvccTimestamp
},
),
"cdc_updated_timestamp": cdcTimestampBuiltin(
"cdc_updated_timestamp",
"Returns schema timestamp of the event",
"event_schema_timestamp": cdcTimestampBuiltin(
"event_schema_timestamp",
"Returns schema timestamp of the event.",
volatility.Volatile,
types.Decimal,
func(rowEvalCtx *rowEvalContext) hlc.Timestamp {
Expand All @@ -130,7 +124,7 @@ var cdcFunctions = map[string]*tree.ResolvedFunctionDefinition{
),
"changefeed_creation_timestamp": cdcTimestampBuiltin(
"changefeed_creation_timestamp",
"Returns changefeed creation time",
"Returns changefeed creation time.",
volatility.Stable,
types.Decimal,
func(rowEvalCtx *rowEvalContext) hlc.Timestamp {
Expand All @@ -139,6 +133,12 @@ var cdcFunctions = map[string]*tree.ResolvedFunctionDefinition{
),
}

var (
eventTypeInsert = tree.NewDString("insert")
eventTypeUpdate = tree.NewDString("update")
eventTypeDelete = tree.NewDString("delete")
)

const cdcFnCategory = "CDC builtin"

var cdcFnProps = &tree.FunctionProperties{
Expand Down Expand Up @@ -253,7 +253,7 @@ func init() {
case volatility.Stable:
// If the stable function is not on the white list,
// then it is blacklisted.
if _, whitelisted := cdcFunctions[fnDef.Name]; !whitelisted {
if _, allowed := cdcFunctions[fnDef.Name]; !allowed {
functionDenyList[fnDef.Name] = struct{}{}
}
}
Expand Down
13 changes: 6 additions & 7 deletions pkg/ccl/changefeedccl/cdceval/functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) {
for fn, preferredOverload := range map[string]preferredFn{
"statement_timestamp": expectTSTZ,
"transaction_timestamp": expectTSTZ,
"cdc_mvcc_timestamp": expectHLC,
"cdc_updated_timestamp": expectHLC,
"event_schema_timestamp": expectHLC,
"changefeed_creation_timestamp": expectHLC,
} {
t.Run(fn, func(t *testing.T) {
Expand All @@ -85,7 +84,7 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) {

targetTS := rowTS
switch fn {
case "cdc_updated_timestamp":
case "event_schema_timestamp":
targetTS = schemaTS
case "changefeed_creation_timestamp":
targetTS = createTS
Expand Down Expand Up @@ -132,7 +131,7 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) {
// should have no bearing on the returned values -- we should see
// the same thing we saw before.
updatedExpectations = initialExpectations
case "cdc_updated_timestamp":
case "event_schema_timestamp":
targetTS = testRow.SchemaTS
fallthrough
default:
Expand Down Expand Up @@ -174,11 +173,11 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) {
})
})

t.Run("cdc_is_delete", func(t *testing.T) {
t.Run("event_op", func(t *testing.T) {
schemaTS := s.Clock().Now()
testRow := makeEventRow(t, desc, schemaTS, false, s.Clock().Now())
e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor,
"SELECT cdc_is_delete() FROM foo")
"SELECT event_op() = 'delete' AS row_deleted FROM foo")
require.NoError(t, err)
defer e.Close()

Expand All @@ -187,7 +186,7 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) {
p, err := e.Eval(ctx, testRow, cdcevent.Row{})
require.NoError(t, err)
require.Equal(t,
map[string]string{"cdc_is_delete": fmt.Sprintf("%t", expectDelete)},
map[string]string{"row_deleted": fmt.Sprintf("%t", expectDelete)},
slurpValues(t, p))
}
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/cdceval/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func TestSelectClauseRequiresPrev(t *testing.T) {
{
name: "nested call to cdc_prev",
desc: descs[`foo`],
stmt: "SELECT jsonb_build_object('op',IF(cdc_is_delete(),'u',IF(row_to_json((cdc_prev).*)::string='null','c','u'))) from foo",
stmt: "SELECT jsonb_build_object('op',IF(event_op() = 'delete','u',IF(row_to_json((cdc_prev).*)::string='null','c','u'))) from foo",
requiresPrev: true,
},
{
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7075,7 +7075,7 @@ INSERT INTO foo (a, b, e) VALUES (2, 'two', 'closed');
CREATE CHANGEFEED
WITH schema_change_policy='stop'
AS SELECT * FROM `+fromClause+`
WHERE e IN ('open', 'closed') AND NOT cdc_is_delete()`)
WHERE e IN ('open', 'closed') AND event_op() != 'delete'`)
defer closeFeed(t, feed)

assertPayloads(t, feed, []string{
Expand Down

0 comments on commit 36e6b88

Please sign in to comment.