From 5265347498c2ebafc2a5e2e7832af28ceeb6678e Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Fri, 13 Jan 2023 09:02:58 -0500 Subject: [PATCH] changefeedccl: Rename and remove CDC functions This PR renames and removes CDC specific functions, while maintaining backward compatability. * `cdc_is_delete()` function removed. It is replaced with `event_op()` function which returns a string describing the type of event. If the changefeed is running with `diff` option, then this function returns `insert`, `update`, or `delete`. If changefeed is running without the `diff` option, we can't tell an update from insert, so this function returns `upsert` or `delete`. * `cdc_mvcc_timestamp()` function removed. This information can be accessed via cockroach standard system column `crdb_internal_mvcc_timestamp`. The same timestamp column is available in the previous row state `(cdc_prev).crdb_internal_mvcc_timestamp` * `cdc_updated_timestamp()` function renamed as `event_schema_timestamp()` Fixes #92482 Epic: CRDB-17161 Release note (enterprise change): Deprecate and replace some of the changefeed specific functions made available in preview mode in 22.2 release. While these functions are deprecated, old changefeed transformations should continue to function properly. Customers are encouraged to closely monitor their changefeeds created during upgrade. While effort was made to maintain backward compatability, the updated changefeed transformation may produce slightly different output (different column names, etc). --- pkg/ccl/changefeedccl/cdceval/compat.go | 45 +++-- pkg/ccl/changefeedccl/cdceval/compat_test.go | 12 ++ pkg/ccl/changefeedccl/cdceval/expr_eval.go | 38 +++- .../changefeedccl/cdceval/expr_eval_test.go | 22 +-- .../cdceval/func_resolver_test.go | 4 +- pkg/ccl/changefeedccl/cdceval/functions.go | 41 ++--- .../changefeedccl/cdceval/functions_test.go | 168 ++++++++++++------ .../changefeedccl/cdceval/validation_test.go | 2 +- pkg/ccl/changefeedccl/changefeed_test.go | 2 +- pkg/ccl/changefeedccl/event_processing.go | 9 +- 10 files changed, 229 insertions(+), 114 deletions(-) diff --git a/pkg/ccl/changefeedccl/cdceval/compat.go b/pkg/ccl/changefeedccl/cdceval/compat.go index ae38c8c7e312..6f5dbc9102be 100644 --- a/pkg/ccl/changefeedccl/cdceval/compat.go +++ b/pkg/ccl/changefeedccl/cdceval/compat.go @@ -9,6 +9,8 @@ package cdceval import ( + "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" + "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/errors" ) @@ -23,18 +25,39 @@ func RewritePreviewExpression(oldExpr *tree.SelectClause) (*tree.SelectClause, e default: return true, expr, nil case *tree.FuncExpr: - // Old style cdc_prev() function returned JSONb object. - // Replace this call with a call to row_to_json((cdc_prev).*) - if n, ok := t.Func.FunctionReference.(*tree.UnresolvedName); ok && n.Parts[0] == "cdc_prev" { - rowToJSON := &tree.FuncExpr{ - Func: tree.ResolvableFunctionReference{ - FunctionReference: tree.FunDefs["row_to_json"], - }, - Exprs: tree.Exprs{ - &tree.TupleStar{Expr: tree.NewUnresolvedName("cdc_prev")}, - }, + if n, ok := t.Func.FunctionReference.(*tree.UnresolvedName); ok { + switch n.Parts[0] { + case "cdc_prev": + // Old style cdc_prev() function returned JSONb object. + // Replace this call with a call to row_to_json((cdc_prev).*) + rowToJSON := &tree.FuncExpr{ + Func: tree.ResolvableFunctionReference{ + FunctionReference: tree.FunDefs["row_to_json"], + }, + Exprs: tree.Exprs{ + &tree.TupleStar{Expr: tree.NewUnresolvedName("cdc_prev")}, + }, + } + return true, rowToJSON, nil + case "cdc_is_delete": + // Old style cdc_is_delete returned boolean; use event_op instead. + newExpr, err := parser.ParseExpr("(event_op() = 'delete')") + if err != nil { + return false, expr, err + } + return true, newExpr, nil + case "cdc_mvcc_timestamp": + // Old cdc_mvcc_timestamp() function gets replaced with crdb_intenral_mvcc_timestamp column. + return true, tree.NewUnresolvedName(colinfo.MVCCTimestampColumnName), nil + case "cdc_updated_timestamp": + // Old cdc_updated_timestamp gets replaced with event_schema_timestamp. + newExpr := &tree.FuncExpr{ + Func: tree.ResolvableFunctionReference{ + FunctionReference: tree.NewUnresolvedName("changefeed_event_schema_timestamp"), + }, + } + return true, newExpr, nil } - return true, rowToJSON, nil } } return true, expr, nil diff --git a/pkg/ccl/changefeedccl/cdceval/compat_test.go b/pkg/ccl/changefeedccl/cdceval/compat_test.go index 8d6c5c7c4a24..1b0371bb29d2 100644 --- a/pkg/ccl/changefeedccl/cdceval/compat_test.go +++ b/pkg/ccl/changefeedccl/cdceval/compat_test.go @@ -64,6 +64,18 @@ func TestCompatRewrite(t *testing.T) { oldExpr: "SELECT foo.*, cdc_prev() FROM " + fooRef + " WHERE cdc_prev()->>'field' = 'blah'", newExpr: "SELECT foo.*, row_to_json((cdc_prev).*) FROM " + fooRef + " WHERE (row_to_json((cdc_prev).*)->>'field') = 'blah'", }, + { + oldExpr: "SELECT foo.*, cdc_is_delete() FROM " + fooRef, + newExpr: "SELECT foo.*, (event_op() = 'delete') FROM " + fooRef, + }, + { + oldExpr: "SELECT foo.*, cdc_is_delete() AS was_deleted FROM " + fooRef, + newExpr: "SELECT foo.*, (event_op() = 'delete') AS was_deleted FROM " + fooRef, + }, + { + oldExpr: "SELECT foo.*, cdc_mvcc_timestamp() FROM " + fooRef, + newExpr: "SELECT foo.*, crdb_internal_mvcc_timestamp FROM " + fooRef, + }, } { sc, err := ParseChangefeedExpression(tc.oldExpr) require.NoError(t, err) diff --git a/pkg/ccl/changefeedccl/cdceval/expr_eval.go b/pkg/ccl/changefeedccl/cdceval/expr_eval.go index 2d72a8e77cc6..9027e2f0ba0a 100644 --- a/pkg/ccl/changefeedccl/cdceval/expr_eval.go +++ b/pkg/ccl/changefeedccl/cdceval/expr_eval.go @@ -41,8 +41,8 @@ type Evaluator struct { execCfg *sql.ExecutorConfig user username.SQLUsername sessionData sessiondatapb.SessionData - - familyEval map[descpb.FamilyID]*familyEvaluator + withDiff bool + familyEval map[descpb.FamilyID]*familyEvaluator } // familyEvaluator is a responsible for evaluating expressions in CDC @@ -82,6 +82,7 @@ func NewEvaluator( user username.SQLUsername, sd sessiondatapb.SessionData, statementTS hlc.Timestamp, + withDiff bool, ) *Evaluator { return &Evaluator{ sc: sc, @@ -89,6 +90,7 @@ func NewEvaluator( user: user, sessionData: sd, statementTS: statementTS, + withDiff: withDiff, familyEval: make(map[descpb.FamilyID]*familyEvaluator, 1), // usually, just 1 family. } } @@ -101,6 +103,7 @@ func newFamilyEvaluator( user username.SQLUsername, sd sessiondatapb.SessionData, statementTS hlc.Timestamp, + withDiff bool, ) *familyEvaluator { e := familyEvaluator{ targetFamilyID: targetFamilyID, @@ -113,6 +116,8 @@ func newFamilyEvaluator( rowCh: make(chan tree.Datums, 1), } e.rowEvalCtx.startTime = statementTS + e.rowEvalCtx.withDiff = withDiff + // Arrange to be notified when event does not match predicate. predicateAsProjection(e.norm) @@ -143,7 +148,9 @@ func (e *Evaluator) Eval( fe, ok := e.familyEval[updatedRow.FamilyID] if !ok { - fe = newFamilyEvaluator(e.sc, updatedRow.FamilyID, e.execCfg, e.user, e.sessionData, e.statementTS) + fe = newFamilyEvaluator( + e.sc, updatedRow.FamilyID, e.execCfg, e.user, e.sessionData, e.statementTS, e.withDiff, + ) e.familyEval[updatedRow.FamilyID] = fe } @@ -183,7 +190,7 @@ func (e *familyEvaluator) eval( } // Setup context. - if err := e.setupContextForRow(ctx, updatedRow); err != nil { + if err := e.setupContextForRow(ctx, updatedRow, prevRow); err != nil { return cdcevent.Row{}, err } @@ -447,9 +454,28 @@ func (e *familyEvaluator) copyPrevRow(prev cdcevent.Row) error { // setupContextForRow configures evaluation context with the provided row // information. -func (e *familyEvaluator) setupContextForRow(ctx context.Context, updated cdcevent.Row) error { +func (e *familyEvaluator) setupContextForRow( + ctx context.Context, updated cdcevent.Row, prevRow cdcevent.Row, +) error { e.rowEvalCtx.ctx = ctx e.rowEvalCtx.updatedRow = updated + + if updated.IsDeleted() { + e.rowEvalCtx.op = eventTypeDelete + } else { + // Insert or update. + if e.rowEvalCtx.withDiff { + if prevRow.IsInitialized() { + e.rowEvalCtx.op = eventTypeUpdate + } else { + e.rowEvalCtx.op = eventTypeInsert + } + } else { + // Without diff option we can't tell insert from update; so, use upsert. + e.rowEvalCtx.op = eventTypeUpsert + } + } + return nil } @@ -478,7 +504,9 @@ func (e *familyEvaluator) closeErr() error { type rowEvalContext struct { ctx context.Context startTime hlc.Timestamp + withDiff bool updatedRow cdcevent.Row + op tree.Datum } // cdcAnnotationAddr is the address used to store relevant information diff --git a/pkg/ccl/changefeedccl/cdceval/expr_eval_test.go b/pkg/ccl/changefeedccl/cdceval/expr_eval_test.go index c5b721d159d4..3d4d23d72597 100644 --- a/pkg/ccl/changefeedccl/cdceval/expr_eval_test.go +++ b/pkg/ccl/changefeedccl/cdceval/expr_eval_test.go @@ -11,6 +11,7 @@ package cdceval import ( "context" "fmt" + "math/rand" "sort" "strconv" "testing" @@ -34,7 +35,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/stretchr/testify/require" ) @@ -145,15 +145,15 @@ $$`) "INSERT INTO foo (a, b) VALUES (2, '2nd test')", "DELETE FROM foo WHERE a=2 AND b='2nd test'", }, - stmt: "SELECT *, cdc_is_delete() FROM foo WHERE 'hello' != 'world'", + stmt: "SELECT *, event_op() = 'delete' AS deleted FROM foo WHERE 'hello' != 'world'", expectMainFamily: []decodeExpectation{ { keyValues: []string{"2nd test", "2"}, - allValues: map[string]string{"a": "2", "b": "2nd test", "e": "inactive", "cdc_is_delete": "false"}, + allValues: map[string]string{"a": "2", "b": "2nd test", "e": "inactive", "deleted": "false"}, }, { keyValues: []string{"2nd test", "2"}, - allValues: map[string]string{"a": "2", "b": "2nd test", "e": "NULL", "cdc_is_delete": "true"}, + allValues: map[string]string{"a": "2", "b": "2nd test", "e": "NULL", "deleted": "true"}, }, }, }, @@ -356,7 +356,7 @@ $$`) "INSERT INTO foo (a, b) VALUES (123, 'select_if')", "DELETE FROM foo where a=123", }, - stmt: "SELECT IF(cdc_is_delete(),'deleted',a::string) AS conditional FROM foo", + stmt: "SELECT IF(event_op() = 'delete','deleted',a::string) AS conditional FROM foo", expectMainFamily: []decodeExpectation{ { keyValues: []string{"select_if", "123"}, @@ -388,7 +388,7 @@ $$`) actions: []string{ "INSERT INTO foo (a, b, h) VALUES (1, 'hello', 'invisible')", }, - stmt: "SELECT a, tableoid, h FROM foo WHERE crdb_internal_mvcc_timestamp = cdc_mvcc_timestamp()", + stmt: "SELECT a, tableoid, h FROM foo WHERE crdb_internal_mvcc_timestamp > 0", expectMainFamily: []decodeExpectation{ { keyValues: []string{"hello", "1"}, @@ -404,7 +404,7 @@ $$`) actions: []string{ "INSERT INTO foo (a, b, h) VALUES (1, 'hello', 'invisible')", }, - stmt: "SELECT a, tableoid, h FROM foo WHERE crdb_internal_mvcc_timestamp = cdc_mvcc_timestamp()", + stmt: "SELECT a, tableoid, h FROM foo WHERE crdb_internal_mvcc_timestamp > 0", expectErr: `column "h" does not exist`, }, { @@ -496,7 +496,7 @@ $$`) "SELECT x, 'only_some_deleted_values', x::string FROM s", }, actions: []string{"DELETE FROM foo WHERE b='only_some_deleted_values'"}, - stmt: `SELECT * FROM foo WHERE cdc_is_delete() AND (cdc_prev).a % 33 = 0`, + stmt: `SELECT * FROM foo WHERE event_op() = 'delete' AND (cdc_prev).a % 33 = 0`, expectMainFamily: repeatExpectation(decodeExpectation{expectUnwatchedErr: true}, 100), expectOnlyCFamily: func() (expectations []decodeExpectation) { for i := 1; i <= 100; i++ { @@ -722,10 +722,9 @@ func slurpValues(t *testing.T, r cdcevent.Row) map[string]string { } func randEncDatumPrimaryFamily( - t *testing.T, desc catalog.TableDescriptor, + t *testing.T, rng *rand.Rand, desc catalog.TableDescriptor, ) (row rowenc.EncDatumRow) { t.Helper() - rng, _ := randutil.NewTestRand() family, err := catalog.MustFindFamilyByID(desc, 0 /* id */) require.NoError(t, err) @@ -781,8 +780,9 @@ func newEvaluatorWithNormCheck( return nil, err } + const withDiff = true return NewEvaluator(norm.SelectClause, execCfg, username.RootUserName(), - defaultDBSessionData, hlc.Timestamp{}), nil + defaultDBSessionData, hlc.Timestamp{}, withDiff), nil } var defaultDBSessionData = sessiondatapb.SessionData{ diff --git a/pkg/ccl/changefeedccl/cdceval/func_resolver_test.go b/pkg/ccl/changefeedccl/cdceval/func_resolver_test.go index c5cb17ebde26..060b04ee8004 100644 --- a/pkg/ccl/changefeedccl/cdceval/func_resolver_test.go +++ b/pkg/ccl/changefeedccl/cdceval/func_resolver_test.go @@ -60,12 +60,12 @@ $$`) }, { testName: "cdc name without schema", - fnName: tree.MakeUnresolvedName("cdc_mvcc_timestamp"), + fnName: tree.MakeUnresolvedName("changefeed_creation_timestamp"), expectedSchema: "public", }, { testName: "uppercase cdc name without schema", - fnName: tree.MakeUnresolvedName("cdc_mVCC_timeStamp"), + fnName: tree.MakeUnresolvedName("changefeed_creATIon_TimeStamp"), expectedSchema: "public", }, { diff --git a/pkg/ccl/changefeedccl/cdceval/functions.go b/pkg/ccl/changefeedccl/cdceval/functions.go index 58c4c5fcd88e..ebd5cccbaadc 100644 --- a/pkg/ccl/changefeedccl/cdceval/functions.go +++ b/pkg/ccl/changefeedccl/cdceval/functions.go @@ -90,38 +90,26 @@ var cdcFunctions = map[string]*tree.ResolvedFunctionDefinition{ //"st_asgeojson", //"st_estimatedextent", - // NB: even though some cdc functions appear to be stable (e.g. cdc_is_delete()), + // NB: even though some cdc functions appear to be stable (e.g. event_op()), // we should not mark custom CDC functions as stable. Doing so will cause // optimizer to (constant) fold this function during optimization step -- something // we definitely don't want to do because we need to evaluate those functions // for each event. - "cdc_is_delete": makeCDCBuiltIn( - "cdc_is_delete", + "event_op": makeCDCBuiltIn( + "event_op", tree.Overload{ Types: tree.ParamTypes{}, - ReturnType: tree.FixedReturnType(types.Bool), + ReturnType: tree.FixedReturnType(types.String), Fn: func(ctx context.Context, evalCtx *eval.Context, datums tree.Datums) (tree.Datum, error) { rowEvalCtx := rowEvalContextFromEvalContext(evalCtx) - if rowEvalCtx.updatedRow.IsDeleted() { - return tree.DBoolTrue, nil - } - return tree.DBoolFalse, nil + return rowEvalCtx.op, nil }, - Info: "Returns true if the event is a deletion", + Info: "Returns 'insert', 'update', 'upsert' or 'delete' to describe the type of the operation.", Volatility: volatility.Volatile, }), - "cdc_mvcc_timestamp": cdcTimestampBuiltin( - "cdc_mvcc_timestamp", - "Returns MVCC timestamp of the event", - volatility.Volatile, - types.Decimal, - func(rowEvalCtx *rowEvalContext) hlc.Timestamp { - return rowEvalCtx.updatedRow.MvccTimestamp - }, - ), - "cdc_updated_timestamp": cdcTimestampBuiltin( - "cdc_updated_timestamp", - "Returns schema timestamp of the event", + "event_schema_timestamp": cdcTimestampBuiltin( + "event_schema_timestamp", + "Returns schema timestamp of the event.", volatility.Volatile, types.Decimal, func(rowEvalCtx *rowEvalContext) hlc.Timestamp { @@ -130,7 +118,7 @@ var cdcFunctions = map[string]*tree.ResolvedFunctionDefinition{ ), "changefeed_creation_timestamp": cdcTimestampBuiltin( "changefeed_creation_timestamp", - "Returns changefeed creation time", + "Returns changefeed creation time.", volatility.Stable, types.Decimal, func(rowEvalCtx *rowEvalContext) hlc.Timestamp { @@ -139,6 +127,13 @@ var cdcFunctions = map[string]*tree.ResolvedFunctionDefinition{ ), } +var ( + eventTypeInsert = tree.NewDString("insert") + eventTypeUpdate = tree.NewDString("update") + eventTypeUpsert = tree.NewDString("upsert") + eventTypeDelete = tree.NewDString("delete") +) + const cdcFnCategory = "CDC builtin" var cdcFnProps = &tree.FunctionProperties{ @@ -253,7 +248,7 @@ func init() { case volatility.Stable: // If the stable function is not on the white list, // then it is blacklisted. - if _, whitelisted := cdcFunctions[fnDef.Name]; !whitelisted { + if _, allowed := cdcFunctions[fnDef.Name]; !allowed { functionDenyList[fnDef.Name] = struct{}{} } } diff --git a/pkg/ccl/changefeedccl/cdceval/functions_test.go b/pkg/ccl/changefeedccl/cdceval/functions_test.go index 0abb6b23a70f..840c091a5199 100644 --- a/pkg/ccl/changefeedccl/cdceval/functions_test.go +++ b/pkg/ccl/changefeedccl/cdceval/functions_test.go @@ -20,6 +20,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" + "github.com/cockroachdb/cockroach/pkg/sql/randgen" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" @@ -74,8 +77,7 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) { for fn, preferredOverload := range map[string]preferredFn{ "statement_timestamp": expectTSTZ, "transaction_timestamp": expectTSTZ, - "cdc_mvcc_timestamp": expectHLC, - "cdc_updated_timestamp": expectHLC, + "event_schema_timestamp": expectHLC, "changefeed_creation_timestamp": expectHLC, } { t.Run(fn, func(t *testing.T) { @@ -85,22 +87,21 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) { targetTS := rowTS switch fn { - case "cdc_updated_timestamp": + case "event_schema_timestamp": targetTS = schemaTS case "changefeed_creation_timestamp": targetTS = createTS } // We'll run tests against some future time stamp to ensure // that time functions use correct values. - testRow := makeEventRow(t, desc, schemaTS, false, rowTS) - e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, - fmt.Sprintf("SELECT "+ - "%[1]s() AS preferred,"+ // Preferred overload. - "%[1]s():::TIMESTAMPTZ AS tstz,"+ // Force timestamptz overload. - "%[1]s():::TIMESTAMP AS ts,"+ // Force timestamp overload. - "%[1]s():::DECIMAL AS dec,"+ // Force decimal overload. - "%[1]s()::STRING AS str"+ // Casts preferred overload to string. - " FROM foo", fn)) + testRow := makeEventRow(t, desc, schemaTS, false, rowTS, false) + e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, false, fmt.Sprintf("SELECT "+ + "%[1]s() AS preferred,"+ // Preferred overload. + "%[1]s():::TIMESTAMPTZ AS tstz,"+ // Force timestamptz overload. + "%[1]s():::TIMESTAMP AS ts,"+ // Force timestamp overload. + "%[1]s():::DECIMAL AS dec,"+ // Force decimal overload. + "%[1]s()::STRING AS str"+ // Casts preferred overload to string. + " FROM foo", fn)) require.NoError(t, err) defer e.Close() e.statementTS = createTS @@ -132,7 +133,7 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) { // should have no bearing on the returned values -- we should see // the same thing we saw before. updatedExpectations = initialExpectations - case "cdc_updated_timestamp": + case "event_schema_timestamp": targetTS = testRow.SchemaTS fallthrough default: @@ -158,10 +159,9 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) { // timezone. Since we don't do any special setup with session data, the // default timezone is UTC. We'll use a "strange" timezone of -1h33m from // UTC to test conversion. - testRow := makeEventRow(t, desc, s.Clock().Now(), false, futureTS) - e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, - fmt.Sprintf("SELECT timezone('+01:33:00', '%s'::time) FROM foo", - futureTS.GoTime().Format("15:04:05"))) + testRow := makeEventRow(t, desc, s.Clock().Now(), false, futureTS, false) + e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, false, fmt.Sprintf("SELECT timezone('+01:33:00', '%s'::time) FROM foo", + futureTS.GoTime().Format("15:04:05"))) require.NoError(t, err) defer e.Close() @@ -174,21 +174,73 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) { }) }) - t.Run("cdc_is_delete", func(t *testing.T) { + t.Run("event_op", func(t *testing.T) { schemaTS := s.Clock().Now() - testRow := makeEventRow(t, desc, schemaTS, false, s.Clock().Now()) - e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, - "SELECT cdc_is_delete() FROM foo") - require.NoError(t, err) - defer e.Close() + row := makeEventRow(t, desc, schemaTS, false, s.Clock().Now(), true) + deletedRow := makeEventRow(t, desc, schemaTS, true, s.Clock().Now(), true) + nilRow := cdcevent.Row{} + + for _, tc := range []struct { + op string + row cdcevent.Row + prevRow cdcevent.Row + withDiff bool + expect string + }{ + { + op: "insert", + row: row, + prevRow: nilRow, + withDiff: true, + expect: "insert", + }, + { + op: "update", + row: row, + prevRow: row, + withDiff: true, + expect: "update", + }, + { + // Without diff, we can't tell an update from insert, so we emit upsert. + op: "insert", + row: row, + prevRow: nilRow, + withDiff: false, + expect: "upsert", + }, + { + // Without diff, we can't tell an update from insert, so we emit upsert. + op: "update", + row: row, + prevRow: nilRow, + withDiff: false, + expect: "upsert", + }, + { + op: "delete", + row: deletedRow, + prevRow: row, + withDiff: true, + expect: "delete", + }, + { + op: "delete", + row: deletedRow, + prevRow: row, + withDiff: false, + expect: "delete", + }, + } { + t.Run(fmt.Sprintf("%s/diff=%t", tc.op, tc.withDiff), func(t *testing.T) { + e, err := newEvaluator(&execCfg, &semaCtx, tc.row.EventDescriptor, tc.withDiff, "SELECT event_op() FROM foo") + require.NoError(t, err) + defer e.Close() - for _, expectDelete := range []bool{true, false} { - testRow := makeEventRow(t, desc, schemaTS, expectDelete, s.Clock().Now()) - p, err := e.Eval(ctx, testRow, cdcevent.Row{}) - require.NoError(t, err) - require.Equal(t, - map[string]string{"cdc_is_delete": fmt.Sprintf("%t", expectDelete)}, - slurpValues(t, p)) + p, err := e.Eval(ctx, tc.row, tc.prevRow) + require.NoError(t, err) + require.Equal(t, map[string]string{"event_op": tc.expect}, slurpValues(t, p)) + }) } }) @@ -200,10 +252,9 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) { } t.Run("pg_collation_for", func(t *testing.T) { - testRow := makeEventRow(t, desc, s.Clock().Now(), false, s.Clock().Now()) + testRow := makeEventRow(t, desc, s.Clock().Now(), false, s.Clock().Now(), false) - e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, - `SELECT pg_collation_for('hello' COLLATE de_DE) AS col FROM foo`) + e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, false, `SELECT pg_collation_for('hello' COLLATE de_DE) AS col FROM foo`) require.NoError(t, err) defer e.Close() @@ -214,11 +265,10 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) { for _, fn := range []string{"to_json", "to_jsonb"} { t.Run(fn, func(t *testing.T) { - testRow := makeEventRow(t, desc, s.Clock().Now(), false, s.Clock().Now()) + testRow := makeEventRow(t, desc, s.Clock().Now(), false, s.Clock().Now(), false) rowDatums := testRow.EncDatums() - e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, - fmt.Sprintf("SELECT %s(a) FROM foo", fn)) + e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, false, fmt.Sprintf("SELECT %s(a) FROM foo", fn)) require.NoError(t, err) defer e.Close() @@ -231,11 +281,10 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) { } t.Run("row_to_json", func(t *testing.T) { - testRow := makeEventRow(t, desc, s.Clock().Now(), false, s.Clock().Now()) + testRow := makeEventRow(t, desc, s.Clock().Now(), false, s.Clock().Now(), false) rowDatums := testRow.EncDatums() - e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, - "SELECT row_to_json(row(a, b, c)) FROM foo") + e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, false, "SELECT row_to_json(row(a, b, c)) FROM foo") require.NoError(t, err) defer e.Close() @@ -251,10 +300,9 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) { }) t.Run("jsonb_build_array", func(t *testing.T) { - testRow := makeEventRow(t, desc, s.Clock().Now(), false, s.Clock().Now()) + testRow := makeEventRow(t, desc, s.Clock().Now(), false, s.Clock().Now(), false) rowDatums := testRow.EncDatums() - e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, - "SELECT jsonb_build_array(a, a, 42) AS three_ints FROM foo") + e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, false, "SELECT jsonb_build_array(a, a, 42) AS three_ints FROM foo") require.NoError(t, err) defer e.Close() @@ -271,10 +319,9 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) { }) t.Run("jsonb_build_object", func(t *testing.T) { - testRow := makeEventRow(t, desc, s.Clock().Now(), false, s.Clock().Now()) + testRow := makeEventRow(t, desc, s.Clock().Now(), false, s.Clock().Now(), false) rowDatums := testRow.EncDatums() - e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, - "SELECT jsonb_build_object('a', a, 'b', b, 'c', c) AS obj FROM foo") + e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, false, "SELECT jsonb_build_object('a', a, 'b', b, 'c', c) AS obj FROM foo") require.NoError(t, err) defer e.Close() @@ -293,9 +340,8 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) { // These functions have overloads; call the one that's stable overload // (i.e. one that needs to convert types.Any to string). t.Run(fn, func(t *testing.T) { - testRow := makeEventRow(t, desc, s.Clock().Now(), false, s.Clock().Now()) - e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, - fmt.Sprintf("SELECT %s(42) FROM foo", fn)) + testRow := makeEventRow(t, desc, s.Clock().Now(), false, s.Clock().Now(), false) + e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, false, fmt.Sprintf("SELECT %s(42) FROM foo", fn)) require.NoError(t, err) defer e.Close() @@ -309,10 +355,9 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) { // overlaps has many overloads; most of them are immutable, but 1 is stable. t.Run("overlaps", func(t *testing.T) { - testRow := makeEventRow(t, desc, s.Clock().Now(), false, s.Clock().Now()) + testRow := makeEventRow(t, desc, s.Clock().Now(), false, s.Clock().Now(), false) - e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, - `SELECT overlaps(transaction_timestamp(), interval '0', transaction_timestamp(), interval '-1s') FROM foo`) + e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, false, `SELECT overlaps(transaction_timestamp(), interval '0', transaction_timestamp(), interval '-1s') FROM foo`) require.NoError(t, err) defer e.Close() @@ -324,7 +369,7 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) { // Test that cdc specific functions correctly resolve overload, and that an // error is returned when cdc function called with wrong arguments. t.Run("cdc function errors", func(t *testing.T) { - testRow := makeEventRow(t, desc, s.Clock().Now(), false, s.Clock().Now()) + testRow := makeEventRow(t, desc, s.Clock().Now(), false, s.Clock().Now(), false) // currently, all cdc functions take no args, so call these functions with // some arguments. rng, _ := randutil.NewTestRand() @@ -343,8 +388,7 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) { // Run this test only for CDC specific functions. if def != useDefaultBuiltin && def.Overloads[0].FunctionProperties.Category == cdcFnCategory { t.Run(fn, func(t *testing.T) { - e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, - fmt.Sprintf("SELECT %s(%s) FROM foo", fn, fnArgs())) + e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, false, fmt.Sprintf("SELECT %s(%s) FROM foo", fn, fnArgs())) require.NoError(t, err) _, err = e.Eval(ctx, testRow, testRow) require.Regexp(t, "unknown signature", err) @@ -360,9 +404,15 @@ func makeEventRow( schemaTS hlc.Timestamp, deleted bool, mvccTS hlc.Timestamp, + includeMVCCCol bool, ) cdcevent.Row { t.Helper() - datums := randEncDatumPrimaryFamily(t, desc) + rng, _ := randutil.NewTestRand() + + datums := randEncDatumPrimaryFamily(t, rng, desc) + if includeMVCCCol { + datums = append(datums, rowenc.EncDatum{Datum: randgen.RandDatum(rng, colinfo.MVCCTimestampColumnType, false)}) + } r := cdcevent.TestingMakeEventRow(desc, 0, datums, deleted) r.SchemaTS = schemaTS r.MvccTimestamp = mvccTS @@ -370,7 +420,11 @@ func makeEventRow( } func newEvaluator( - execCfg *sql.ExecutorConfig, semaCtx *tree.SemaContext, ed *cdcevent.EventDescriptor, expr string, + execCfg *sql.ExecutorConfig, + semaCtx *tree.SemaContext, + ed *cdcevent.EventDescriptor, + withDiff bool, + expr string, ) (*Evaluator, error) { sc, err := ParseChangefeedExpression(expr) if err != nil { @@ -383,5 +437,5 @@ func newEvaluator( return nil, err } return NewEvaluator(norm.SelectClause, execCfg, username.RootUserName(), - defaultDBSessionData, execCfg.Clock.Now()), nil + defaultDBSessionData, execCfg.Clock.Now(), withDiff), nil } diff --git a/pkg/ccl/changefeedccl/cdceval/validation_test.go b/pkg/ccl/changefeedccl/cdceval/validation_test.go index 085336eee89a..a4e7a4c33911 100644 --- a/pkg/ccl/changefeedccl/cdceval/validation_test.go +++ b/pkg/ccl/changefeedccl/cdceval/validation_test.go @@ -276,7 +276,7 @@ func TestSelectClauseRequiresPrev(t *testing.T) { { name: "nested call to cdc_prev", desc: descs[`foo`], - stmt: "SELECT jsonb_build_object('op',IF(cdc_is_delete(),'u',IF(row_to_json((cdc_prev).*)::string='null','c','u'))) from foo", + stmt: "SELECT jsonb_build_object('op',IF(event_op() = 'delete','u',IF(row_to_json((cdc_prev).*)::string='null','c','u'))) from foo", requiresPrev: true, }, { diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 593d16301426..8b25add26221 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -7075,7 +7075,7 @@ INSERT INTO foo (a, b, e) VALUES (2, 'two', 'closed'); CREATE CHANGEFEED WITH schema_change_policy='stop' AS SELECT * FROM `+fromClause+` -WHERE e IN ('open', 'closed') AND NOT cdc_is_delete()`) +WHERE e IN ('open', 'closed') AND event_op() != 'delete'`) defer closeFeed(t, feed) assertPayloads(t, feed, []string{ diff --git a/pkg/ccl/changefeedccl/event_processing.go b/pkg/ccl/changefeedccl/event_processing.go index 5046b5e49d6f..f52b47c88b4f 100644 --- a/pkg/ccl/changefeedccl/event_processing.go +++ b/pkg/ccl/changefeedccl/event_processing.go @@ -231,7 +231,7 @@ func newKVEventToRowConsumer( var evaluator *cdceval.Evaluator if spec.Select.Expr != "" { - evaluator, err = newEvaluator(ctx, cfg, spec) + evaluator, err = newEvaluator(ctx, cfg, spec, details.Opts.GetFilters().WithDiff) if err != nil { return nil, err } @@ -260,7 +260,10 @@ func newKVEventToRowConsumer( } func newEvaluator( - ctx context.Context, cfg *sql.ExecutorConfig, spec execinfrapb.ChangeAggregatorSpec, + ctx context.Context, + cfg *sql.ExecutorConfig, + spec execinfrapb.ChangeAggregatorSpec, + withDiff bool, ) (*cdceval.Evaluator, error) { sc, err := cdceval.ParseChangefeedExpression(spec.Select.Expr) if err != nil { @@ -290,7 +293,7 @@ func newEvaluator( sd = *spec.Feed.SessionData } - return cdceval.NewEvaluator(sc, cfg, spec.User(), sd, spec.Feed.StatementTime), nil + return cdceval.NewEvaluator(sc, cfg, spec.User(), sd, spec.Feed.StatementTime, withDiff), nil } func (c *kvEventToRowConsumer) topicForEvent(eventMeta cdcevent.Metadata) (TopicDescriptor, error) {