Skip to content

Commit

Permalink
changefeedccl: Rename and remove CDC functions
Browse files Browse the repository at this point in the history
This PR renames and removes CDC specific functions,
while maintaining backward compatability.

* `cdc_is_delete()` function removed.  It is replaced
  with `event_op()` function which returns a string describing
  the type of event.  If the changefeed is running with `diff`
  option, then this function returns `insert`, `update`, or
  `delete`.  If changefeed is running without the `diff` option,
  we can't tell an update from insert, so this function returns
  `upsert` or `delete`.
* `cdc_mvcc_timestamp()` function removed.  This information
  can be accessed via cockroach standard system column
  `crdb_internal_mvcc_timestamp`.  The same timestamp column
  is avaiable in the previous row state
  `(cdc_prev).crdb_internal_mvcc_timestamp`
* `cdc_updated_timestamp()` function renamed as
  `event_schema_timestamp()`

Fixes #92482
Epic: CRDB-17161

Release note (enterprise change): Deprecate and replace some of the
changefeed specific functions made available in preview mode in 22.2
release.   While these functions are deprecated, old changefeed
transformations should continue to function properly.
Customers are encouraged to closely monitor their changefeeds
created during upgrade.  While effort was made to maintain backward
compatibility, the updated changefeed transformation may produce
slightly different output (different column names, etc).
  • Loading branch information
Yevgeniy Miretskiy committed Feb 13, 2023
1 parent 5f310bc commit 6b8c307
Show file tree
Hide file tree
Showing 10 changed files with 229 additions and 114 deletions.
45 changes: 34 additions & 11 deletions pkg/ccl/changefeedccl/cdceval/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
package cdceval

import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/errors"
)
Expand All @@ -23,18 +25,39 @@ func RewritePreviewExpression(oldExpr *tree.SelectClause) (*tree.SelectClause, e
default:
return true, expr, nil
case *tree.FuncExpr:
// Old style cdc_prev() function returned JSONb object.
// Replace this call with a call to row_to_json((cdc_prev).*)
if n, ok := t.Func.FunctionReference.(*tree.UnresolvedName); ok && n.Parts[0] == "cdc_prev" {
rowToJSON := &tree.FuncExpr{
Func: tree.ResolvableFunctionReference{
FunctionReference: tree.FunDefs["row_to_json"],
},
Exprs: tree.Exprs{
&tree.TupleStar{Expr: tree.NewUnresolvedName("cdc_prev")},
},
if n, ok := t.Func.FunctionReference.(*tree.UnresolvedName); ok {
switch n.Parts[0] {
case "cdc_prev":
// Old style cdc_prev() function returned JSONb object.
// Replace this call with a call to row_to_json((cdc_prev).*)
rowToJSON := &tree.FuncExpr{
Func: tree.ResolvableFunctionReference{
FunctionReference: tree.FunDefs["row_to_json"],
},
Exprs: tree.Exprs{
&tree.TupleStar{Expr: tree.NewUnresolvedName("cdc_prev")},
},
}
return true, rowToJSON, nil
case "cdc_is_delete":
// Old style cdc_is_delete returned boolean; use event_op instead.
newExpr, err := parser.ParseExpr("(event_op() = 'delete')")
if err != nil {
return false, expr, err
}
return true, newExpr, nil
case "cdc_mvcc_timestamp":
// Old cdc_mvcc_timestamp() function gets replaced with crdb_intenral_mvcc_timestamp column.
return true, tree.NewUnresolvedName(colinfo.MVCCTimestampColumnName), nil
case "cdc_updated_timestamp":
// Old cdc_updated_timestamp gets replaced with event_schema_timestamp.
newExpr := &tree.FuncExpr{
Func: tree.ResolvableFunctionReference{
FunctionReference: tree.NewUnresolvedName("changefeed_event_schema_timestamp"),
},
}
return true, newExpr, nil
}
return true, rowToJSON, nil
}
}
return true, expr, nil
Expand Down
12 changes: 12 additions & 0 deletions pkg/ccl/changefeedccl/cdceval/compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,18 @@ func TestCompatRewrite(t *testing.T) {
oldExpr: "SELECT foo.*, cdc_prev() FROM " + fooRef + " WHERE cdc_prev()->>'field' = 'blah'",
newExpr: "SELECT foo.*, row_to_json((cdc_prev).*) FROM " + fooRef + " WHERE (row_to_json((cdc_prev).*)->>'field') = 'blah'",
},
{
oldExpr: "SELECT foo.*, cdc_is_delete() FROM " + fooRef,
newExpr: "SELECT foo.*, (event_op() = 'delete') FROM " + fooRef,
},
{
oldExpr: "SELECT foo.*, cdc_is_delete() AS was_deleted FROM " + fooRef,
newExpr: "SELECT foo.*, (event_op() = 'delete') AS was_deleted FROM " + fooRef,
},
{
oldExpr: "SELECT foo.*, cdc_mvcc_timestamp() FROM " + fooRef,
newExpr: "SELECT foo.*, crdb_internal_mvcc_timestamp FROM " + fooRef,
},
} {
sc, err := ParseChangefeedExpression(tc.oldExpr)
require.NoError(t, err)
Expand Down
38 changes: 33 additions & 5 deletions pkg/ccl/changefeedccl/cdceval/expr_eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ type Evaluator struct {
execCfg *sql.ExecutorConfig
user username.SQLUsername
sessionData sessiondatapb.SessionData

familyEval map[descpb.FamilyID]*familyEvaluator
withDiff bool
familyEval map[descpb.FamilyID]*familyEvaluator
}

// familyEvaluator is a responsible for evaluating expressions in CDC
Expand Down Expand Up @@ -82,13 +82,15 @@ func NewEvaluator(
user username.SQLUsername,
sd sessiondatapb.SessionData,
statementTS hlc.Timestamp,
withDiff bool,
) *Evaluator {
return &Evaluator{
sc: sc,
execCfg: execCfg,
user: user,
sessionData: sd,
statementTS: statementTS,
withDiff: withDiff,
familyEval: make(map[descpb.FamilyID]*familyEvaluator, 1), // usually, just 1 family.
}
}
Expand All @@ -101,6 +103,7 @@ func newFamilyEvaluator(
user username.SQLUsername,
sd sessiondatapb.SessionData,
statementTS hlc.Timestamp,
withDiff bool,
) *familyEvaluator {
e := familyEvaluator{
targetFamilyID: targetFamilyID,
Expand All @@ -113,6 +116,8 @@ func newFamilyEvaluator(
rowCh: make(chan tree.Datums, 1),
}
e.rowEvalCtx.startTime = statementTS
e.rowEvalCtx.withDiff = withDiff

// Arrange to be notified when event does not match predicate.
predicateAsProjection(e.norm)

Expand Down Expand Up @@ -143,7 +148,9 @@ func (e *Evaluator) Eval(

fe, ok := e.familyEval[updatedRow.FamilyID]
if !ok {
fe = newFamilyEvaluator(e.sc, updatedRow.FamilyID, e.execCfg, e.user, e.sessionData, e.statementTS)
fe = newFamilyEvaluator(
e.sc, updatedRow.FamilyID, e.execCfg, e.user, e.sessionData, e.statementTS, e.withDiff,
)
e.familyEval[updatedRow.FamilyID] = fe
}

Expand Down Expand Up @@ -183,7 +190,7 @@ func (e *familyEvaluator) eval(
}

// Setup context.
if err := e.setupContextForRow(ctx, updatedRow); err != nil {
if err := e.setupContextForRow(ctx, updatedRow, prevRow); err != nil {
return cdcevent.Row{}, err
}

Expand Down Expand Up @@ -447,9 +454,28 @@ func (e *familyEvaluator) copyPrevRow(prev cdcevent.Row) error {

// setupContextForRow configures evaluation context with the provided row
// information.
func (e *familyEvaluator) setupContextForRow(ctx context.Context, updated cdcevent.Row) error {
func (e *familyEvaluator) setupContextForRow(
ctx context.Context, updated cdcevent.Row, prevRow cdcevent.Row,
) error {
e.rowEvalCtx.ctx = ctx
e.rowEvalCtx.updatedRow = updated

if updated.IsDeleted() {
e.rowEvalCtx.op = eventTypeDelete
} else {
// Insert or update.
if e.rowEvalCtx.withDiff {
if prevRow.IsInitialized() {
e.rowEvalCtx.op = eventTypeUpdate
} else {
e.rowEvalCtx.op = eventTypeInsert
}
} else {
// Without diff option we can't tell insert from update; so, use upsert.
e.rowEvalCtx.op = eventTypeUpsert
}
}

return nil
}

Expand Down Expand Up @@ -478,7 +504,9 @@ func (e *familyEvaluator) closeErr() error {
type rowEvalContext struct {
ctx context.Context
startTime hlc.Timestamp
withDiff bool
updatedRow cdcevent.Row
op tree.Datum
}

// cdcAnnotationAddr is the address used to store relevant information
Expand Down
22 changes: 11 additions & 11 deletions pkg/ccl/changefeedccl/cdceval/expr_eval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package cdceval
import (
"context"
"fmt"
"math/rand"
"sort"
"strconv"
"testing"
Expand All @@ -34,7 +35,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -145,15 +145,15 @@ $$`)
"INSERT INTO foo (a, b) VALUES (2, '2nd test')",
"DELETE FROM foo WHERE a=2 AND b='2nd test'",
},
stmt: "SELECT *, cdc_is_delete() FROM foo WHERE 'hello' != 'world'",
stmt: "SELECT *, event_op() = 'delete' AS deleted FROM foo WHERE 'hello' != 'world'",
expectMainFamily: []decodeExpectation{
{
keyValues: []string{"2nd test", "2"},
allValues: map[string]string{"a": "2", "b": "2nd test", "e": "inactive", "cdc_is_delete": "false"},
allValues: map[string]string{"a": "2", "b": "2nd test", "e": "inactive", "deleted": "false"},
},
{
keyValues: []string{"2nd test", "2"},
allValues: map[string]string{"a": "2", "b": "2nd test", "e": "NULL", "cdc_is_delete": "true"},
allValues: map[string]string{"a": "2", "b": "2nd test", "e": "NULL", "deleted": "true"},
},
},
},
Expand Down Expand Up @@ -356,7 +356,7 @@ $$`)
"INSERT INTO foo (a, b) VALUES (123, 'select_if')",
"DELETE FROM foo where a=123",
},
stmt: "SELECT IF(cdc_is_delete(),'deleted',a::string) AS conditional FROM foo",
stmt: "SELECT IF(event_op() = 'delete','deleted',a::string) AS conditional FROM foo",
expectMainFamily: []decodeExpectation{
{
keyValues: []string{"select_if", "123"},
Expand Down Expand Up @@ -388,7 +388,7 @@ $$`)
actions: []string{
"INSERT INTO foo (a, b, h) VALUES (1, 'hello', 'invisible')",
},
stmt: "SELECT a, tableoid, h FROM foo WHERE crdb_internal_mvcc_timestamp = cdc_mvcc_timestamp()",
stmt: "SELECT a, tableoid, h FROM foo WHERE crdb_internal_mvcc_timestamp > 0",
expectMainFamily: []decodeExpectation{
{
keyValues: []string{"hello", "1"},
Expand All @@ -404,7 +404,7 @@ $$`)
actions: []string{
"INSERT INTO foo (a, b, h) VALUES (1, 'hello', 'invisible')",
},
stmt: "SELECT a, tableoid, h FROM foo WHERE crdb_internal_mvcc_timestamp = cdc_mvcc_timestamp()",
stmt: "SELECT a, tableoid, h FROM foo WHERE crdb_internal_mvcc_timestamp > 0",
expectErr: `column "h" does not exist`,
},
{
Expand Down Expand Up @@ -496,7 +496,7 @@ $$`)
"SELECT x, 'only_some_deleted_values', x::string FROM s",
},
actions: []string{"DELETE FROM foo WHERE b='only_some_deleted_values'"},
stmt: `SELECT * FROM foo WHERE cdc_is_delete() AND (cdc_prev).a % 33 = 0`,
stmt: `SELECT * FROM foo WHERE event_op() = 'delete' AND (cdc_prev).a % 33 = 0`,
expectMainFamily: repeatExpectation(decodeExpectation{expectUnwatchedErr: true}, 100),
expectOnlyCFamily: func() (expectations []decodeExpectation) {
for i := 1; i <= 100; i++ {
Expand Down Expand Up @@ -722,10 +722,9 @@ func slurpValues(t *testing.T, r cdcevent.Row) map[string]string {
}

func randEncDatumPrimaryFamily(
t *testing.T, desc catalog.TableDescriptor,
t *testing.T, rng *rand.Rand, desc catalog.TableDescriptor,
) (row rowenc.EncDatumRow) {
t.Helper()
rng, _ := randutil.NewTestRand()

family, err := catalog.MustFindFamilyByID(desc, 0 /* id */)
require.NoError(t, err)
Expand Down Expand Up @@ -781,8 +780,9 @@ func newEvaluatorWithNormCheck(
return nil, err
}

const withDiff = true
return NewEvaluator(norm.SelectClause, execCfg, username.RootUserName(),
defaultDBSessionData, hlc.Timestamp{}), nil
defaultDBSessionData, hlc.Timestamp{}, withDiff), nil
}

var defaultDBSessionData = sessiondatapb.SessionData{
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/cdceval/func_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ $$`)
},
{
testName: "cdc name without schema",
fnName: tree.MakeUnresolvedName("cdc_mvcc_timestamp"),
fnName: tree.MakeUnresolvedName("changefeed_creation_timestamp"),
expectedSchema: "public",
},
{
testName: "uppercase cdc name without schema",
fnName: tree.MakeUnresolvedName("cdc_mVCC_timeStamp"),
fnName: tree.MakeUnresolvedName("changefeed_creATIon_TimeStamp"),
expectedSchema: "public",
},
{
Expand Down
41 changes: 18 additions & 23 deletions pkg/ccl/changefeedccl/cdceval/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,38 +90,26 @@ var cdcFunctions = map[string]*tree.ResolvedFunctionDefinition{
//"st_asgeojson",
//"st_estimatedextent",

// NB: even though some cdc functions appear to be stable (e.g. cdc_is_delete()),
// NB: even though some cdc functions appear to be stable (e.g. event_op()),
// we should not mark custom CDC functions as stable. Doing so will cause
// optimizer to (constant) fold this function during optimization step -- something
// we definitely don't want to do because we need to evaluate those functions
// for each event.
"cdc_is_delete": makeCDCBuiltIn(
"cdc_is_delete",
"event_op": makeCDCBuiltIn(
"event_op",
tree.Overload{
Types: tree.ParamTypes{},
ReturnType: tree.FixedReturnType(types.Bool),
ReturnType: tree.FixedReturnType(types.String),
Fn: func(ctx context.Context, evalCtx *eval.Context, datums tree.Datums) (tree.Datum, error) {
rowEvalCtx := rowEvalContextFromEvalContext(evalCtx)
if rowEvalCtx.updatedRow.IsDeleted() {
return tree.DBoolTrue, nil
}
return tree.DBoolFalse, nil
return rowEvalCtx.op, nil
},
Info: "Returns true if the event is a deletion",
Info: "Returns 'insert', 'update', 'upsert' or 'delete' to describe the type of the operation.",
Volatility: volatility.Volatile,
}),
"cdc_mvcc_timestamp": cdcTimestampBuiltin(
"cdc_mvcc_timestamp",
"Returns MVCC timestamp of the event",
volatility.Volatile,
types.Decimal,
func(rowEvalCtx *rowEvalContext) hlc.Timestamp {
return rowEvalCtx.updatedRow.MvccTimestamp
},
),
"cdc_updated_timestamp": cdcTimestampBuiltin(
"cdc_updated_timestamp",
"Returns schema timestamp of the event",
"event_schema_timestamp": cdcTimestampBuiltin(
"event_schema_timestamp",
"Returns schema timestamp of the event.",
volatility.Volatile,
types.Decimal,
func(rowEvalCtx *rowEvalContext) hlc.Timestamp {
Expand All @@ -130,7 +118,7 @@ var cdcFunctions = map[string]*tree.ResolvedFunctionDefinition{
),
"changefeed_creation_timestamp": cdcTimestampBuiltin(
"changefeed_creation_timestamp",
"Returns changefeed creation time",
"Returns changefeed creation time.",
volatility.Stable,
types.Decimal,
func(rowEvalCtx *rowEvalContext) hlc.Timestamp {
Expand All @@ -139,6 +127,13 @@ var cdcFunctions = map[string]*tree.ResolvedFunctionDefinition{
),
}

var (
eventTypeInsert = tree.NewDString("insert")
eventTypeUpdate = tree.NewDString("update")
eventTypeUpsert = tree.NewDString("upsert")
eventTypeDelete = tree.NewDString("delete")
)

const cdcFnCategory = "CDC builtin"

var cdcFnProps = &tree.FunctionProperties{
Expand Down Expand Up @@ -253,7 +248,7 @@ func init() {
case volatility.Stable:
// If the stable function is not on the white list,
// then it is blacklisted.
if _, whitelisted := cdcFunctions[fnDef.Name]; !whitelisted {
if _, allowed := cdcFunctions[fnDef.Name]; !allowed {
functionDenyList[fnDef.Name] = struct{}{}
}
}
Expand Down
Loading

0 comments on commit 6b8c307

Please sign in to comment.