-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CDC: Support Filters for Changefeeds #56949
Comments
Hi @amruss, I've guessed the C-ategory of your issue and suitably labeled it. Please re-label if inaccurate. 🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is otan. |
Did get a request from a CEA customer to allow filtering based in part on the SQL user that was responsible for the change. Might also be nice to have the SQL application name available as a predicate, too. |
@amruss - very interested in this |
Hi @bryanhuntesl this ended up just under the cutline for 21.2, so we probably won't have something comprehensive here for the next release. That being said we're going to do scoping/validation here over the next few months. What does the use case look like for you? |
Hi - curious about a status for this? As a customer with regulated data (PCI, PII), it's super beneficial to have columns filtered right at the start when using CDC to populate a downstream pipeline. |
PCI/PII didn't even occur to me when I brought this up, but yes that's an obvious case where it would be essential. |
82562: changefeeccl: Projections and Filters in CDC. r=miretskiy a=miretskiy Add a variant of CHANGEFEED statement that allows specification of predicates and projections. ``` CREATE CHANGEFEED [INTO 'sink'] [WITH opt=val, ...] AS SELECT .... FROM t WHERE ... ``` This changefeed variant can target at most 1 table (and 1 column family) at a time. The expressions used as the projections and filters can be almost any supported expression with some restrictions: * Volatile functions not allowed. * Sub-selects not allowed. * Aggregate and window functions (i.e. functions operating over many rows) not allowed. * Some stable functions, notably functions which return MVCC timestamp, are overridden to return MVCC timestamp of the event. In addition, some CDC specific functions are provided: * cdc_is_delete: returns true if the event is a deletion event. * cdc_prev: returns JSON representation of the previous row state. * cdc_updated_timestamp: returns event update timestamp (usually MVCC timestamp, but can be different if e.g. undergoing schema changes) Additional CDC specific functions will be added in the follow on PRs. Few examples: * Emit all but the deletion events: ``` CREATE CHANGEFEED INTO 'kafka://' AS SELECT * FROM table WHERE NOT cdc_is_delete() ``` * Emit all events that modified `important_col` column: ``` CREATE CHANGEFEED INTO 'kafka://' WITH diff AS SELECT *, cdc_prev() AS previous FROM important_table WHERE important_col != cdc_prev()->'important_col' ``` * Emit few colums, as well as computed expresions: ``` CREATE CHANGEFEED INTO 'kafka://' WITH diff AS SELECT warehouseID, (totalItems - orderedItems) as itemsAvailable FROM warehouse WHERE region='US/east'; ``` When filter expression is specified, changefeed will now consult optimizer so that the set of spans scanned by changefeed can be restricted based on the predicate. For example, given the following table and a changefeed: ``` CREATE TABLE warehouse ( region STRING, warehouseID int, .... PRIMARY KEY (region, warehouseID) ); CREATE CHANGEFEED INTO 'kafka://' WITH diff AS SELECT * FROM warehouse WHERE region='US/east'; ``` The create changefeed will only scan table spans that contain `US/east` region (and ignore all other table spans). --- For foundational work, see: - #81676 - #81249 - #80499 Addresses: - #56949 - #31214 --- Release Notes (enterprise): CHANGEFEED statement now supports general expressions -- predicates and projections. Projections allow customers to emit only the data that they care about, including computed columns, while predicates (i.e. filters) allow them to restrict the data that's emitted only to those events that match the filter. ``` CREATE CHANGEFEED INTO 'kafka://' AS SELECT * FROM t WHERE NOT cdc_is_delete() ``` Co-authored-by: Yevgeniy Miretskiy <[email protected]>
This is now available in preview. https://www.cockroachlabs.com/docs/dev/cdc-transformations.html |
Haha no way! Thanks for implementing this!
…On Wed, 7 Dec 2022, 17:47 Aaron Zinger, ***@***.***> wrote:
Closed #56949 <#56949> as
completed.
—
Reply to this email directly, view it on GitHub
<#56949 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AADGJZQQEWVX5VXM334ZX2TWMDESHANCNFSM4T4YXSIQ>
.
You are receiving this because you commented.Message ID:
***@***.***>
|
Planned for removing the preview label in 23.1 coming out in a few months! |
A way to syntactically specify a filter on what is emitted in a changefeed, ensuring we can support filtering by partition, and only the partitioned data is emitted in a changefeed.
UX tbd. Alternatively, we could allow a more broad set of filters.
Changefeed should fail when partitions are changed.
gz#7834
Jira issue: CRDB-2889
The text was updated successfully, but these errors were encountered: