-
Notifications
You must be signed in to change notification settings - Fork 3.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
82562: changefeeccl: Projections and Filters in CDC. r=miretskiy a=miretskiy Add a variant of CHANGEFEED statement that allows specification of predicates and projections. ``` CREATE CHANGEFEED [INTO 'sink'] [WITH opt=val, ...] AS SELECT .... FROM t WHERE ... ``` This changefeed variant can target at most 1 table (and 1 column family) at a time. The expressions used as the projections and filters can be almost any supported expression with some restrictions: * Volatile functions not allowed. * Sub-selects not allowed. * Aggregate and window functions (i.e. functions operating over many rows) not allowed. * Some stable functions, notably functions which return MVCC timestamp, are overridden to return MVCC timestamp of the event. In addition, some CDC specific functions are provided: * cdc_is_delete: returns true if the event is a deletion event. * cdc_prev: returns JSON representation of the previous row state. * cdc_updated_timestamp: returns event update timestamp (usually MVCC timestamp, but can be different if e.g. undergoing schema changes) Additional CDC specific functions will be added in the follow on PRs. Few examples: * Emit all but the deletion events: ``` CREATE CHANGEFEED INTO 'kafka://' AS SELECT * FROM table WHERE NOT cdc_is_delete() ``` * Emit all events that modified `important_col` column: ``` CREATE CHANGEFEED INTO 'kafka://' WITH diff AS SELECT *, cdc_prev() AS previous FROM important_table WHERE important_col != cdc_prev()->'important_col' ``` * Emit few colums, as well as computed expresions: ``` CREATE CHANGEFEED INTO 'kafka://' WITH diff AS SELECT warehouseID, (totalItems - orderedItems) as itemsAvailable FROM warehouse WHERE region='US/east'; ``` When filter expression is specified, changefeed will now consult optimizer so that the set of spans scanned by changefeed can be restricted based on the predicate. For example, given the following table and a changefeed: ``` CREATE TABLE warehouse ( region STRING, warehouseID int, .... PRIMARY KEY (region, warehouseID) ); CREATE CHANGEFEED INTO 'kafka://' WITH diff AS SELECT * FROM warehouse WHERE region='US/east'; ``` The create changefeed will only scan table spans that contain `US/east` region (and ignore all other table spans). --- For foundational work, see: - #81676 - #81249 - #80499 Addresses: - #56949 - #31214 --- Release Notes (enterprise): CHANGEFEED statement now supports general expressions -- predicates and projections. Projections allow customers to emit only the data that they care about, including computed columns, while predicates (i.e. filters) allow them to restrict the data that's emitted only to those events that match the filter. ``` CREATE CHANGEFEED INTO 'kafka://' AS SELECT * FROM t WHERE NOT cdc_is_delete() ``` Co-authored-by: Yevgeniy Miretskiy <[email protected]>
- Loading branch information
Showing
50 changed files
with
2,022 additions
and
706 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
// Copyright 2022 The Cockroach Authors. | ||
// | ||
// Licensed as a CockroachDB Enterprise file under the Cockroach Community | ||
// License (the "License"); you may not use this file except in compliance with | ||
// the License. You may obtain a copy of the License at | ||
// | ||
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt | ||
|
||
package cdceval | ||
|
||
import ( | ||
"context" | ||
|
||
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" | ||
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb" | ||
"github.com/cockroachdb/cockroach/pkg/keys" | ||
"github.com/cockroachdb/cockroach/pkg/roachpb" | ||
"github.com/cockroachdb/cockroach/pkg/sql" | ||
"github.com/cockroachdb/cockroach/pkg/sql/catalog" | ||
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" | ||
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" | ||
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval" | ||
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree" | ||
"github.com/cockroachdb/cockroach/pkg/util/hlc" | ||
"github.com/cockroachdb/errors" | ||
) | ||
|
||
// ConstrainPrimaryIndexSpanByFilter attempts to constrain table primary | ||
// index span if changefeed expression (select clause) is specified. | ||
// Returns possibly constrained spans, and a possibly modified (optimized) | ||
// select clause. | ||
func ConstrainPrimaryIndexSpanByFilter( | ||
ctx context.Context, | ||
execCtx sql.JobExecContext, | ||
selectClause string, | ||
descr catalog.TableDescriptor, | ||
target jobspb.ChangefeedTargetSpecification, | ||
includeVirtual bool, | ||
) (_ []roachpb.Span, updatedSelect string, _ error) { | ||
if selectClause == "" { | ||
return nil, "", errors.AssertionFailedf("unexpected empty filter") | ||
} | ||
sc, err := ParseChangefeedExpression(selectClause) | ||
if err != nil { | ||
return nil, "", pgerror.Wrap(err, pgcode.InvalidParameterValue, | ||
"could not parse changefeed expression") | ||
} | ||
|
||
ed, err := newEventDescriptorForTarget(descr, target, schemaTS(execCtx), includeVirtual) | ||
if err != nil { | ||
return nil, "", err | ||
} | ||
|
||
evalCtx := &execCtx.ExtendedEvalContext().Context | ||
spans, remainingFilter, err := constrainSpansBySelectClause( | ||
ctx, execCtx, evalCtx, execCtx.ExecCfg().Codec, sc, ed) | ||
if err != nil { | ||
return nil, "", err | ||
} | ||
|
||
if remainingFilter != nil { | ||
// non-nil remainingFilter implies we had sc.Where clause. | ||
if remainingFilter == tree.DBoolTrue { | ||
sc.Where = nil | ||
} else { | ||
sc.Where.Expr = remainingFilter | ||
} | ||
} | ||
|
||
return spans, AsStringUnredacted(sc), nil | ||
} | ||
|
||
// constrainSpansBySelectClause is a helper that attempts to constrain primary | ||
// index spans by the filter in the select clause. Returns constrained spans, | ||
// along with the remaining filter. | ||
func constrainSpansBySelectClause( | ||
ctx context.Context, | ||
sc sql.SpanConstrainer, | ||
evalCtx *eval.Context, | ||
codec keys.SQLCodec, | ||
selectClause *tree.SelectClause, | ||
ed *cdcevent.EventDescriptor, | ||
) ([]roachpb.Span, tree.Expr, error) { | ||
// Predicate changefeed currently works on a single table only. | ||
// Verify this assumption. | ||
if len(selectClause.From.Tables) != 1 { | ||
return nil, nil, errors.AssertionFailedf( | ||
"expected 1 table expression, found %d", len(selectClause.From.Tables)) | ||
} | ||
|
||
if selectClause.Where == nil { | ||
// Nothing to constrain. | ||
return []roachpb.Span{ed.TableDescriptor().PrimaryIndexSpan(codec)}, nil, nil | ||
} | ||
|
||
tableName := tableNameOrAlias(ed.TableName, selectClause.From.Tables[0]) | ||
semaCtx := newSemaCtx(ed) | ||
return sc.ConstrainPrimaryIndexSpanByExpr( | ||
ctx, sql.BestEffortConstrain, tableName, ed.TableDescriptor(), | ||
evalCtx, semaCtx, selectClause.Where.Expr) | ||
} | ||
|
||
func schemaTS(execCtx sql.JobExecContext) hlc.Timestamp { | ||
if execCtx.Txn() != nil { | ||
return execCtx.Txn().ReadTimestamp() | ||
} | ||
return execCtx.ExecCfg().Clock.Now() | ||
} |
Oops, something went wrong.