Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: Rename and remove CDC functions #96295

Merged
merged 1 commit into from
Feb 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 34 additions & 11 deletions pkg/ccl/changefeedccl/cdceval/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
package cdceval

import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/errors"
)
Expand All @@ -23,18 +25,39 @@ func RewritePreviewExpression(oldExpr *tree.SelectClause) (*tree.SelectClause, e
default:
return true, expr, nil
case *tree.FuncExpr:
// Old style cdc_prev() function returned JSONb object.
// Replace this call with a call to row_to_json((cdc_prev).*)
if n, ok := t.Func.FunctionReference.(*tree.UnresolvedName); ok && n.Parts[0] == "cdc_prev" {
rowToJSON := &tree.FuncExpr{
Func: tree.ResolvableFunctionReference{
FunctionReference: tree.FunDefs["row_to_json"],
},
Exprs: tree.Exprs{
&tree.TupleStar{Expr: tree.NewUnresolvedName("cdc_prev")},
},
if n, ok := t.Func.FunctionReference.(*tree.UnresolvedName); ok {
switch n.Parts[0] {
case "cdc_prev":
// Old style cdc_prev() function returned JSONb object.
// Replace this call with a call to row_to_json((cdc_prev).*)
rowToJSON := &tree.FuncExpr{
Func: tree.ResolvableFunctionReference{
FunctionReference: tree.FunDefs["row_to_json"],
},
Exprs: tree.Exprs{
&tree.TupleStar{Expr: tree.NewUnresolvedName("cdc_prev")},
},
}
return true, rowToJSON, nil
case "cdc_is_delete":
// Old style cdc_is_delete returned boolean; use event_op instead.
newExpr, err := parser.ParseExpr("(event_op() = 'delete')")
if err != nil {
return false, expr, err
}
return true, newExpr, nil
case "cdc_mvcc_timestamp":
// Old cdc_mvcc_timestamp() function gets replaced with crdb_intenral_mvcc_timestamp column.
return true, tree.NewUnresolvedName(colinfo.MVCCTimestampColumnName), nil
case "cdc_updated_timestamp":
// Old cdc_updated_timestamp gets replaced with event_schema_timestamp.
newExpr := &tree.FuncExpr{
Func: tree.ResolvableFunctionReference{
FunctionReference: tree.NewUnresolvedName("changefeed_event_schema_timestamp"),
},
}
return true, newExpr, nil
}
return true, rowToJSON, nil
}
}
return true, expr, nil
Expand Down
12 changes: 12 additions & 0 deletions pkg/ccl/changefeedccl/cdceval/compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,18 @@ func TestCompatRewrite(t *testing.T) {
oldExpr: "SELECT foo.*, cdc_prev() FROM " + fooRef + " WHERE cdc_prev()->>'field' = 'blah'",
newExpr: "SELECT foo.*, row_to_json((cdc_prev).*) FROM " + fooRef + " WHERE (row_to_json((cdc_prev).*)->>'field') = 'blah'",
},
{
oldExpr: "SELECT foo.*, cdc_is_delete() FROM " + fooRef,
newExpr: "SELECT foo.*, (event_op() = 'delete') FROM " + fooRef,
},
{
oldExpr: "SELECT foo.*, cdc_is_delete() AS was_deleted FROM " + fooRef,
newExpr: "SELECT foo.*, (event_op() = 'delete') AS was_deleted FROM " + fooRef,
},
{
oldExpr: "SELECT foo.*, cdc_mvcc_timestamp() FROM " + fooRef,
newExpr: "SELECT foo.*, crdb_internal_mvcc_timestamp FROM " + fooRef,
},
} {
sc, err := ParseChangefeedExpression(tc.oldExpr)
require.NoError(t, err)
Expand Down
38 changes: 33 additions & 5 deletions pkg/ccl/changefeedccl/cdceval/expr_eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ type Evaluator struct {
execCfg *sql.ExecutorConfig
user username.SQLUsername
sessionData sessiondatapb.SessionData

familyEval map[descpb.FamilyID]*familyEvaluator
withDiff bool
familyEval map[descpb.FamilyID]*familyEvaluator
}

// familyEvaluator is a responsible for evaluating expressions in CDC
Expand Down Expand Up @@ -82,13 +82,15 @@ func NewEvaluator(
user username.SQLUsername,
sd sessiondatapb.SessionData,
statementTS hlc.Timestamp,
withDiff bool,
) *Evaluator {
return &Evaluator{
sc: sc,
execCfg: execCfg,
user: user,
sessionData: sd,
statementTS: statementTS,
withDiff: withDiff,
familyEval: make(map[descpb.FamilyID]*familyEvaluator, 1), // usually, just 1 family.
}
}
Expand All @@ -101,6 +103,7 @@ func newFamilyEvaluator(
user username.SQLUsername,
sd sessiondatapb.SessionData,
statementTS hlc.Timestamp,
withDiff bool,
) *familyEvaluator {
e := familyEvaluator{
targetFamilyID: targetFamilyID,
Expand All @@ -113,6 +116,8 @@ func newFamilyEvaluator(
rowCh: make(chan tree.Datums, 1),
}
e.rowEvalCtx.startTime = statementTS
e.rowEvalCtx.withDiff = withDiff

// Arrange to be notified when event does not match predicate.
predicateAsProjection(e.norm)

Expand Down Expand Up @@ -143,7 +148,9 @@ func (e *Evaluator) Eval(

fe, ok := e.familyEval[updatedRow.FamilyID]
if !ok {
fe = newFamilyEvaluator(e.sc, updatedRow.FamilyID, e.execCfg, e.user, e.sessionData, e.statementTS)
fe = newFamilyEvaluator(
e.sc, updatedRow.FamilyID, e.execCfg, e.user, e.sessionData, e.statementTS, e.withDiff,
)
e.familyEval[updatedRow.FamilyID] = fe
}

Expand Down Expand Up @@ -183,7 +190,7 @@ func (e *familyEvaluator) eval(
}

// Setup context.
if err := e.setupContextForRow(ctx, updatedRow); err != nil {
if err := e.setupContextForRow(ctx, updatedRow, prevRow); err != nil {
return cdcevent.Row{}, err
}

Expand Down Expand Up @@ -447,9 +454,28 @@ func (e *familyEvaluator) copyPrevRow(prev cdcevent.Row) error {

// setupContextForRow configures evaluation context with the provided row
// information.
func (e *familyEvaluator) setupContextForRow(ctx context.Context, updated cdcevent.Row) error {
func (e *familyEvaluator) setupContextForRow(
ctx context.Context, updated cdcevent.Row, prevRow cdcevent.Row,
) error {
e.rowEvalCtx.ctx = ctx
e.rowEvalCtx.updatedRow = updated

if updated.IsDeleted() {
e.rowEvalCtx.op = eventTypeDelete
} else {
// Insert or update.
if e.rowEvalCtx.withDiff {
if prevRow.IsInitialized() {
e.rowEvalCtx.op = eventTypeUpdate
} else {
e.rowEvalCtx.op = eventTypeInsert
}
} else {
// Without diff option we can't tell insert from update; so, use upsert.
e.rowEvalCtx.op = eventTypeUpsert
}
}

return nil
}

Expand Down Expand Up @@ -478,7 +504,9 @@ func (e *familyEvaluator) closeErr() error {
type rowEvalContext struct {
ctx context.Context
startTime hlc.Timestamp
withDiff bool
updatedRow cdcevent.Row
op tree.Datum
}

// cdcAnnotationAddr is the address used to store relevant information
Expand Down
22 changes: 11 additions & 11 deletions pkg/ccl/changefeedccl/cdceval/expr_eval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package cdceval
import (
"context"
"fmt"
"math/rand"
"sort"
"strconv"
"testing"
Expand All @@ -34,7 +35,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -145,15 +145,15 @@ $$`)
"INSERT INTO foo (a, b) VALUES (2, '2nd test')",
"DELETE FROM foo WHERE a=2 AND b='2nd test'",
},
stmt: "SELECT *, cdc_is_delete() FROM foo WHERE 'hello' != 'world'",
stmt: "SELECT *, event_op() = 'delete' AS deleted FROM foo WHERE 'hello' != 'world'",
expectMainFamily: []decodeExpectation{
{
keyValues: []string{"2nd test", "2"},
allValues: map[string]string{"a": "2", "b": "2nd test", "e": "inactive", "cdc_is_delete": "false"},
allValues: map[string]string{"a": "2", "b": "2nd test", "e": "inactive", "deleted": "false"},
},
{
keyValues: []string{"2nd test", "2"},
allValues: map[string]string{"a": "2", "b": "2nd test", "e": "NULL", "cdc_is_delete": "true"},
allValues: map[string]string{"a": "2", "b": "2nd test", "e": "NULL", "deleted": "true"},
},
},
},
Expand Down Expand Up @@ -356,7 +356,7 @@ $$`)
"INSERT INTO foo (a, b) VALUES (123, 'select_if')",
"DELETE FROM foo where a=123",
},
stmt: "SELECT IF(cdc_is_delete(),'deleted',a::string) AS conditional FROM foo",
stmt: "SELECT IF(event_op() = 'delete','deleted',a::string) AS conditional FROM foo",
expectMainFamily: []decodeExpectation{
{
keyValues: []string{"select_if", "123"},
Expand Down Expand Up @@ -388,7 +388,7 @@ $$`)
actions: []string{
"INSERT INTO foo (a, b, h) VALUES (1, 'hello', 'invisible')",
},
stmt: "SELECT a, tableoid, h FROM foo WHERE crdb_internal_mvcc_timestamp = cdc_mvcc_timestamp()",
stmt: "SELECT a, tableoid, h FROM foo WHERE crdb_internal_mvcc_timestamp > 0",
expectMainFamily: []decodeExpectation{
{
keyValues: []string{"hello", "1"},
Expand All @@ -404,7 +404,7 @@ $$`)
actions: []string{
"INSERT INTO foo (a, b, h) VALUES (1, 'hello', 'invisible')",
},
stmt: "SELECT a, tableoid, h FROM foo WHERE crdb_internal_mvcc_timestamp = cdc_mvcc_timestamp()",
stmt: "SELECT a, tableoid, h FROM foo WHERE crdb_internal_mvcc_timestamp > 0",
expectErr: `column "h" does not exist`,
},
{
Expand Down Expand Up @@ -496,7 +496,7 @@ $$`)
"SELECT x, 'only_some_deleted_values', x::string FROM s",
},
actions: []string{"DELETE FROM foo WHERE b='only_some_deleted_values'"},
stmt: `SELECT * FROM foo WHERE cdc_is_delete() AND (cdc_prev).a % 33 = 0`,
stmt: `SELECT * FROM foo WHERE event_op() = 'delete' AND (cdc_prev).a % 33 = 0`,
expectMainFamily: repeatExpectation(decodeExpectation{expectUnwatchedErr: true}, 100),
expectOnlyCFamily: func() (expectations []decodeExpectation) {
for i := 1; i <= 100; i++ {
Expand Down Expand Up @@ -722,10 +722,9 @@ func slurpValues(t *testing.T, r cdcevent.Row) map[string]string {
}

func randEncDatumPrimaryFamily(
t *testing.T, desc catalog.TableDescriptor,
t *testing.T, rng *rand.Rand, desc catalog.TableDescriptor,
) (row rowenc.EncDatumRow) {
t.Helper()
rng, _ := randutil.NewTestRand()

family, err := catalog.MustFindFamilyByID(desc, 0 /* id */)
require.NoError(t, err)
Expand Down Expand Up @@ -781,8 +780,9 @@ func newEvaluatorWithNormCheck(
return nil, err
}

const withDiff = true
return NewEvaluator(norm.SelectClause, execCfg, username.RootUserName(),
defaultDBSessionData, hlc.Timestamp{}), nil
defaultDBSessionData, hlc.Timestamp{}, withDiff), nil
}

var defaultDBSessionData = sessiondatapb.SessionData{
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/cdceval/func_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ $$`)
},
{
testName: "cdc name without schema",
fnName: tree.MakeUnresolvedName("cdc_mvcc_timestamp"),
fnName: tree.MakeUnresolvedName("changefeed_creation_timestamp"),
expectedSchema: "public",
},
{
testName: "uppercase cdc name without schema",
fnName: tree.MakeUnresolvedName("cdc_mVCC_timeStamp"),
fnName: tree.MakeUnresolvedName("changefeed_creATIon_TimeStamp"),
expectedSchema: "public",
},
{
Expand Down
41 changes: 18 additions & 23 deletions pkg/ccl/changefeedccl/cdceval/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,38 +90,26 @@ var cdcFunctions = map[string]*tree.ResolvedFunctionDefinition{
//"st_asgeojson",
//"st_estimatedextent",

// NB: even though some cdc functions appear to be stable (e.g. cdc_is_delete()),
// NB: even though some cdc functions appear to be stable (e.g. event_op()),
// we should not mark custom CDC functions as stable. Doing so will cause
// optimizer to (constant) fold this function during optimization step -- something
// we definitely don't want to do because we need to evaluate those functions
// for each event.
"cdc_is_delete": makeCDCBuiltIn(
"cdc_is_delete",
"event_op": makeCDCBuiltIn(
"event_op",
tree.Overload{
Types: tree.ParamTypes{},
ReturnType: tree.FixedReturnType(types.Bool),
ReturnType: tree.FixedReturnType(types.String),
Fn: func(ctx context.Context, evalCtx *eval.Context, datums tree.Datums) (tree.Datum, error) {
rowEvalCtx := rowEvalContextFromEvalContext(evalCtx)
if rowEvalCtx.updatedRow.IsDeleted() {
return tree.DBoolTrue, nil
}
return tree.DBoolFalse, nil
return rowEvalCtx.op, nil
},
Info: "Returns true if the event is a deletion",
Info: "Returns 'insert', 'update', 'upsert' or 'delete' to describe the type of the operation.",
Volatility: volatility.Volatile,
}),
"cdc_mvcc_timestamp": cdcTimestampBuiltin(
"cdc_mvcc_timestamp",
"Returns MVCC timestamp of the event",
volatility.Volatile,
types.Decimal,
func(rowEvalCtx *rowEvalContext) hlc.Timestamp {
return rowEvalCtx.updatedRow.MvccTimestamp
},
),
"cdc_updated_timestamp": cdcTimestampBuiltin(
"cdc_updated_timestamp",
"Returns schema timestamp of the event",
"event_schema_timestamp": cdcTimestampBuiltin(
"event_schema_timestamp",
"Returns schema timestamp of the event.",
volatility.Volatile,
types.Decimal,
func(rowEvalCtx *rowEvalContext) hlc.Timestamp {
Expand All @@ -130,7 +118,7 @@ var cdcFunctions = map[string]*tree.ResolvedFunctionDefinition{
),
"changefeed_creation_timestamp": cdcTimestampBuiltin(
"changefeed_creation_timestamp",
"Returns changefeed creation time",
"Returns changefeed creation time.",
volatility.Stable,
types.Decimal,
func(rowEvalCtx *rowEvalContext) hlc.Timestamp {
Expand All @@ -139,6 +127,13 @@ var cdcFunctions = map[string]*tree.ResolvedFunctionDefinition{
),
}

var (
eventTypeInsert = tree.NewDString("insert")
eventTypeUpdate = tree.NewDString("update")
eventTypeUpsert = tree.NewDString("upsert")
eventTypeDelete = tree.NewDString("delete")
)

const cdcFnCategory = "CDC builtin"

var cdcFnProps = &tree.FunctionProperties{
Expand Down Expand Up @@ -253,7 +248,7 @@ func init() {
case volatility.Stable:
// If the stable function is not on the white list,
// then it is blacklisted.
if _, whitelisted := cdcFunctions[fnDef.Name]; !whitelisted {
if _, allowed := cdcFunctions[fnDef.Name]; !allowed {
functionDenyList[fnDef.Name] = struct{}{}
}
}
Expand Down
Loading