Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sink to cloud storage] output old values to CSV files #10167

Closed
zhangjinpeng87 opened this issue Nov 28, 2023 · 2 comments · Fixed by #10174 or #10542
Closed

[sink to cloud storage] output old values to CSV files #10167

zhangjinpeng87 opened this issue Nov 28, 2023 · 2 comments · Fixed by #10174 or #10542
Assignees
Labels
type/feature Issues about a new feature

Comments

@zhangjinpeng87
Copy link
Contributor

Is your feature request related to a problem?

In the repairing upstream data scenario (described in #10100 ), if the mistake operation is update, the sink-to-cloudstorage CSV protocol can not generate the undo DML because for update operations the CSV protocol just record the after-update value.

Describe the feature you'd like

Record the old value for update operations(delete recorded old value already) when oldvalue parameter is true sink-uri="s3://xxx/xxx?protocol=csv&oldvalue=true". For example, there are following update operations:

1) update row1 = b
2) update row1 = c

Current CSV protocol will generate CSV file looks like:

U, db, table, [commit-ts1], rowid=1, value=b
U, db, table, [commit-ts2], rowid=1, value=c

There is no way to generate undo DML for these update because they lack of old value. My proposal is add an option to support using delete and insert to replace update operation for the CSV protocol, and the CSV file looks like:

D, db, table, [commit-ts1], rowid=1, value=a
I, db, table, [commit-ts1], rowid=1, value=b
D, db, table, [commit-ts2], rowid=1, value=b
I, db, table, [commit-ts2], rowid=1, value=c

The advantage of this proposal is we can record the old value for Insert(I), Update(U) and Delete(D) operations. The disadvantage of this proposal is the file size grows if there are many update operations.
And when generate the undo DML, it can generate following DMLs:

Delete * from db.table where rowid=1 and value=c;
Insert into db.table values(1, b);
Delete * from db.table where rowid=1 and value=b;
Insert into db.table values(1, a);

Describe alternatives you've considered

Using two update rows to record old value and new value, and the generated CSV file for above example should looks like:

U, db, table, [commit-ts1], rowid=1, value=a
U, db, table, [commit-ts1], rowid=1, value=b
U, db, table, [commit-ts2], rowid=1, value=b
U, db, table, [commit-ts2], rowid=1, value=c

But the semantic is not clear when generating undo DML.

Teachability, Documentation, Adoption, Migration Strategy

todo

@zhangjinpeng87 zhangjinpeng87 added the type/feature Issues about a new feature label Nov 28, 2023
@zhangjinpeng87 zhangjinpeng87 self-assigned this Nov 28, 2023
@zhangjinpeng87 zhangjinpeng87 changed the title [sink to cloud storage] Record Old Values for Update Statements [sink to cloud storage] record old values for Update statements Nov 28, 2023
@zhangjinpeng87 zhangjinpeng87 changed the title [sink to cloud storage] record old values for Update statements [sink to cloud storage] output old values to CSV files Nov 29, 2023
@flowbehappy
Copy link
Collaborator

Another option is using "U-" or "U+", instead of "I" and "D", to represent UPDATE. cc @Benjamin2037

@benmeadowcroft
Copy link

benmeadowcroft commented Dec 12, 2023

I agree that recording the Update on two separate lines is necessary for us to record row's data before and after the update. I do have 2 concerns however. The first is more immediate and related to the increased complexity it would introduce for using the changefeed to repair erroneous transactions. The second is a future concern over implications if we ever introduced support for triggers to TiDB and the mismatch between triggers on an Update versus a paired Delete and Insert.

Regarding the first concern, recording an Update operation as a pair of Delete and Insert operations makes processing more complex with relational tooling, particularly for scenarios where we are looking to undo erroneous operations.

For example, if an application has been executing erroneous Updates to a table (say employee), in addition to correct Inserts and Deletions, then the user wishes to reverse the erroneous updates they need to process the changes to identify what operations to undo.

With the current CSV protocol we could expect a series of changes to be recorded as follows (see https://docs.pingcap.com/tidb/stable/ticdc-csv#definition-of-the-data-format):

"I","employee","hr",433305438660591626,101,"Smith","Bob","2014-06-04","New York"
"U","employee","hr",433305438660591627,101,"Smith","Bob","2015-10-08","Los Angeles"
"D","employee","hr",433305438660591629,101,"Smith","Bob","2017-03-13","Dallas"
"I","employee","hr",433305438660591631,102,"Alex","Alice","2017-03-14","Shanghai"
"U","employee","hr",433305438660591631,102,"Alex","Alice","2018-06-15","Beijing"

With the original proposal in this issue, the CSV file would look like:

"I","employee","hr",433305438660591626,101,"Smith","Bob","2014-06-04","New York"
"D","employee","hr",433305438660591627,101,"Smith","Bob","2014-06-04","New York"
"I","employee","hr",433305438660591627,101,"Smith","Bob","2015-10-08","Los Angeles"
"D","employee","hr",433305438660591629,101,"Smith","Bob","2017-03-13","Dallas"
"I","employee","hr",433305438660591630,102,"Alex","Alice","2017-03-14","Shanghai"
"D","employee","hr",433305438660591631,102,"Alex","Alice","2017-03-14","Shanghai"
"I","employee","hr",433305438660591631,102,"Alex","Alice","2018-06-15","Beijing"

If we separated out the pre and post update row data using U- for the prior data, and U for the updated data then it would look like the following:

"I","employee","hr",433305438660591626,101,"Smith","Bob","2014-06-04","New York"
"U-","employee","hr",433305438660591627,101,"Smith","Bob","2014-06-04","New York"
"U","employee","hr",433305438660591627,101,"Smith","Bob","2015-10-08","Los Angeles"
"D","employee","hr",433305438660591629,101,"Smith","Bob","2017-03-13","Dallas"
"I","employee","hr",433305438660591630,102,"Alex","Alice","2017-03-14","Shanghai"
"U-","employee","hr",433305438660591631,102,"Alex","Alice","2017-03-14","Shanghai"
"U","employee","hr",433305438660591631,102,"Alex","Alice","2018-06-15","Beijing"

Now, with that CSV file loaded into a database table (e.g. employee_cdc), it is much easier (IMO) to be able to reverse the Update operations within a given timestamp range with the second approach:

update employee inner join employee_cdc
    on employee.id = employee_cdc.id -- join on origin tables unique/primary key
set employee.surname   = employee_cdc.surname,
    employee.firstname = employee_cdc.firstname,
    employee.hiredate  = employee_cdc.hiredate,
    employee.location  = employee_cdc.location,
where employee_cdc._cdc_op = 'U-' -- the data prior to the erroneous update
  and employee_cdc._cdc_Commit_ts >= '…1626'
  and employee_cdc._cdc_Commit_ts <= '…1631'
;

In contrast, if they are recorded using Delete and Insert statements, separating out the non-paired insert and deletes from the paired inserts and deletes is more complex. In this example it requires an additional join to identify the two rows relating to the update:

with employee_updates AS (select pre.*
  from employee_changes pre inner join employee_changes post
    on pre._cdc_Commit_ts = post._cdc_Commit_ts
   and pre.Id = post.Id
   and pre._cdc_schema = post._cdc_schema
   and pre._cdc_table = post._cdc_table
  where pre._cdc_op = 'D' and post._cdc_op = 'I')

update employee inner join employee_updates
    on employee.Id = employee_updates.Id -- join on origin tables unique/primary key
set employee.LastName   = employee_updates.LastName,
    employee.FirstName = employee_updates.FirstName,
    employee.HireDate  = employee_updates.HireDate,
    employee.OfficeLocation  = employee_updates.OfficeLocation
where employee_updates._cdc_op = 'D' -- the data prior to the erroneous update
   and employee_cdc._cdc_Commit_ts >= '…1626'
   and employee_cdc._cdc_Commit_ts <= '…1631'
;

Looking at other solutions that offer streaming representation of data changes, the ability to preserve the Update operation in this fashion appears to be a common pattern.

For example, Snowflake streams in their METADATA$ACTION column do record an update as a paired Delete and Insert, but also include a METADATA$ISUPDATE column that indicates the Insert or Delete is part of an update. With Databricks change data feeds, when queried using the table_changes function, the tabular results include the _change_type column, the values of this column include update_preimage and update_postimage which provide the before and after row data for an update operation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/feature Issues about a new feature
Projects
None yet
3 participants