Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-3478] Support CDC for Spark in Hudi #5885

Closed
wants to merge 7 commits into from

Conversation

YannByron
Copy link
Contributor

Tips

What is the purpose of the pull request

(For example: This pull request adds quick-start document.)

Brief change log

(for example:)

  • Modify AnnotationLocation checkstyle rule in checkstyle.xml

Verify this pull request

(Please pick either of the following options)

This pull request is a trivial rework / code cleanup without any test coverage.

(or)

This pull request is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end.
  • Added HoodieClientWriteTest to verify the change.
  • Manually verified the change by running a job locally.

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

@YannByron YannByron marked this pull request as draft June 16, 2022 11:59
@YannByron YannByron marked this pull request as ready for review June 17, 2022 07:14
@YannByron
Copy link
Contributor Author

@hudi-bot run azure

@YannByron
Copy link
Contributor Author

YannByron commented Jun 20, 2022

@vinothchandar please review this. The case when cdc.supplemental.logging is false and SparkSQL syntax may be supported in the subsequent commits or the other pr.

@YannByron
Copy link
Contributor Author

update:
support cdc when disable cdc.supplemental.logging. in this case, cdc block will only persist the op and record_key fields.

@vinothchandar
Copy link
Member

@YannByron made one pass to understand the file changes. Will start the detailed review next

@prasannarajaperumal
Copy link
Contributor

Hey @YannByron ,

Thanks for this PR and a well written RFC-51.
Overall I agree with the high level direction. I will do the code review soon. I have a question before that.

Should we introduce a new concept (CDC) here on Hudi tables? I think this should be sub-mode of Incremental Query.
For illustration, Suppose we have something like the following modes for incremental query (change stream)

  • LATEST_STATE_INSERT_DELETE_KEYS (entire row state for all inserted keys and empty delete keys?)
  • LATEST_STATE_ONLY_INSERT_KEYS
  • MIN_STATE_CHANGE_INSERT_DELETE_KEYS (only columns changed and consolidate multiple inserts,deletes, or remove data inserted and deleted within the time range)
  • ALL_STATE_CHANGES_INSERT_DELETE_KEYS (include every single change made to the key)

I think read-schema changes for the CDC style incremental queries could be a challenge.

The reason I think of converging the incremental queries with RFC-51 is because

  • Removes the limitation of tracking deletes accross compaction boundaries for incremental queries
  • I think it just makes sense for us to track the data we track when "cdc.supplemental.logging=false" by default for all Hudi tables. Having this data stored efficiently for point lookups will help with record merging as well I suppose?

@YannByron What do you think? (cc @vinothchandar )

Cheers
Prasanna

@YannByron
Copy link
Contributor Author

YannByron commented Jul 17, 2022

Hey @prasannarajaperumal Thank you very much for reviewing this.

CDC is not a new concept, is a common concept for database. So I think it's better to distinguish CDC and Incremental Query. Some reasons:

  • CDC is better known than incremental query. incremental query is just defined by hudi.
  • Different from Incremental Query and Snapshot Query, CDC has its own output format in which every record have op, ts_ms, before and after fields.
  • According to RFC-51, CDC has its own read and write logical. We have to persist some other information for CDC when data is written to hudi.

Looking forward to your reply.

@prasannarajaperumal
Copy link
Contributor

prasannarajaperumal commented Jul 18, 2022

@YannByron

I understand CDC is a database concept. My point was incremental query is also just a form of CDC if you think about how it is used. Yes the schema is different based on the modes of the incremental query. I believe we can unify the current CDC proposal and the incremental query feature to make it simple for users consuming change streams out of Hudi table.

We can call this unified feature as incremental query or CDC or Change Streams (I am not hung up on the name).

@@ -102,6 +117,15 @@
protected Map<String, HoodieRecord<T>> keyToNewRecords;
protected Set<String> writtenRecordKeys;
protected HoodieFileWriter<IndexedRecord> fileWriter;
// a flag that indicate whether allow the change data to write out a cdc log file.
protected boolean cdcEnabled = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create a sub-class of HoodieAppendHandle - HoodieChangeTrackingAppendHandle and move all the code related to persisting row-level change tracking metadata to the subclass. I prefer naming all methods/parameters as changeTracking instead of CDC. CDC is a feature, ChangeTracking is the action you do during write.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you mean HoodieChangeTrackingMergeHandle?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @prasannarajaperumal
i try to create some sub-classes HoodieChangeTrackingMergeHandle, HoodieChangeTrackingSortedMergeHandle and HoodieChangeTrackingConcatHandle, and add the logical to judge whether HoodieChangeTrackingXXXHandle should be created at all the places where HoodieMergeHandle and other classes are created before. I think it is maybe less clear.

* Relative cdc file path that store the CDC data.
*/
@Nullable
private String cdcPath;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ChangeTrackingStat

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe changeTrackingPath ? After all it is a file path, not a stat.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do these new fields evolve well? i.e backwards compatible with existing write stat without these new fields?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me test this case that use the old hudi version to query tables created by this branch.

if (cdcEnabled) {
if (indexedRecord.isPresent()) {
GenericRecord record = (GenericRecord) indexedRecord.get();
cdcData.add(cdcRecord(CDCOperationEnum.UPDATE, hoodieRecord.getRecordKey(), hoodieRecord.getPartitionPath(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will be holding the record data in-memory until the handle is closed when supplemental logging is enabled. Any side-effects to be cautious about?
We will be deflating the actual record once its written to the file and bloom filter calculation happens after - would there be significant memory pressure if we still hold on to the data for cdc and how do we handle this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, it's ok.
A base parquet file is about 128M in most common cases. Even if all the records is updated, the cdcData will take the memory that is less that about 300M. And if the workflow is heavy, user can increase the memory of workers.
But If we are worry about this, use the hudi.common.util.collection.ExternalSpillableMap instead of this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, with all the java/jvm overhead, I think. it'll be more than 300M comfortably. Can we use the spillable map instead here in this PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, i will use hudi.common.util.collection.ExternalSpillableMap instead here.

public enum CDCFileTypeEnum {

CDC_LOG_FILE,
ADD_BASE_File,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/ADD_BASE_File/ADD_BASE_FILE

* Parse HoodieWriteStat, judge which type the file is, and what strategy should be used to parse CDC data.
* Then build a [[ChangeFileForSingleFileGroupAndCommit]] object.
*/
private def parseWriteStat(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to generalize this out of Spark and make the logic to identify the different CDC types and load them common to all clients?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. let me move this to the common place.

@YannByron
Copy link
Contributor Author

@prasannarajaperumal
Got it. In some aspects, Incremental query can be considered a kind of CDC. Incremental query just return the data inserted/updated after a certain point. it use the normal format, and only care about the new values rather than the old values.

@vinothchandar
Copy link
Member

vinothchandar commented Jul 20, 2022

+1 on doing this as a part of the incremental query. CDC is a database concept, but not a query per se. Incremental Query actually executes the SQL on the CDC stream. So they are actually different. CDC refers to the mechanism of capturing changes from a database log consistently, in order. Thats all.

I'd rather not introduce a new query type, and take on the overhead for us to explain incremental vs CDC query on an ongoing basis. There are already four query types : snapshot, read optimized, point in time, incremental.

Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you think through the following scenarios and ensure things work as expected.

  • Across clustering operations
  • Across multi writer scenarios.

I am yet to review the MOR relation changes, the write handle changes themselves look good. tbh its cool that such major functionality can be implemented e2e with smaller LOC.

@danny0405 do you want to take a pass at this

if (cdcEnabled) {
if (indexedRecord.isPresent()) {
GenericRecord record = (GenericRecord) indexedRecord.get();
cdcData.add(cdcRecord(CDCOperationEnum.UPDATE, hoodieRecord.getRecordKey(), hoodieRecord.getPartitionPath(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, with all the java/jvm overhead, I think. it'll be more than 300M comfortably. Can we use the spillable map instead here in this PR

@@ -399,9 +447,57 @@ protected void writeIncomingRecords() throws IOException {
}
}

protected GenericData.Record cdcRecord(CDCOperationEnum operation, String recordKey, String partitionPath,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RFC-46 is moving away from GenericRecord as the canonical data record. So we may want to move in that direction as well. We need to sequence the two efforts correctly.

@@ -114,6 +118,8 @@ protected HoodieWriteHandle(HoodieWriteConfig config, String instantTime, String
HoodieTable<T, I, K, O> hoodieTable, Option<Schema> overriddenSchema,
TaskContextSupplier taskContextSupplier) {
super(config, Option.of(instantTime), hoodieTable);
this.keyFiled = config.populateMetaFields() ? HoodieRecord.RECORD_KEY_METADATA_FIELD
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: keyField

throws Exception {
String jsonStr = new String(bytes, StandardCharsets.UTF_8);
if (jsonStr.isEmpty()) {
return null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please avoid using null as return type

* parse the bytes of deltacommit, and get the base file and the log files belonging to this
* provided file group.
*/
public static Pair<String, List<String>> getFileSliceForFileGroupFromDeltaCommit(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does nt any of the existing code do this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no. let me do this first. no simple and low-codes way can do this. i think it deserves a new pr.

* Relative cdc file path that store the CDC data.
*/
@Nullable
private String cdcPath;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do these new fields evolve well? i.e backwards compatible with existing write stat without these new fields?

@@ -229,6 +230,10 @@ protected synchronized void scanInternal(Option<KeySpec> keySpecOpt) {
HoodieLogBlock logBlock = logFormatReaderWrapper.next();
final String instantTime = logBlock.getLogBlockHeader().get(INSTANT_TIME);
totalLogBlocks.incrementAndGet();
if (logBlock.getBlockType() == CDC_DATA_BLOCK) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the data block is rolled back or commit is rolled back, is the CDC block skipped correctly. Can we write some tests to cover these scenarios

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, let me add some UTs.

@vinothchandar
Copy link
Member

@YannByron Can we rework this by making CDC a special mode of inc query?

@YannByron
Copy link
Contributor Author

Can you think through the following scenarios and ensure things work as expected.

  • Across clustering operations
  • Across multi writer scenarios.

The TestCDCDataFrameSuite UT in this pr has covered the scenario across clustering operations. But the multi-writer scenarios is not covered dnd i think maybe it's not necessary.

@YannByron
Copy link
Contributor Author

YannByron commented Jul 20, 2022

@YannByron Can we rework this by making CDC a special mode of inc query?

Actually, i think the incremental query is a special mode of CDC. The inc query keeps the normal format and return the less info about CDC.
But it is fine. If we want to do this, just modify the configs that users need to be aware like this:

the current way sets:
hoodie.datasource.query.type=cdc

the modified way sets:

hoodie.datasource.query.type=incremental
hoodie.datasource.incremental.output=cdc

is it ok? cc @vinothchandar

@danny0405
Copy link
Contributor

danny0405 commented Jul 20, 2022

Can you think through the following scenarios and ensure things work as expected.

  • Across clustering operations
  • Across multi writer scenarios.

I am yet to review the MOR relation changes, the write handle changes themselves look good. tbh its cool that such major functionality can be implemented e2e with smaller LOC.

@danny0405 do you want to take a pass at this

Yeah, i need a detail review for this part, we seem not even make consensus on the design and i don't understand why the RFC doc was merged, confused firstly.

@xushiyan
Copy link
Member

xushiyan commented Jul 20, 2022

we seem not even make consensus on the design and i don't understand why the RFC doc was merged, confused firstly.

@danny0405 I've explained in #5436 (comment). Also we've messaged over wechat with @YannByron on this saying we should make a follow up PR to update. It's not meant to ignore any unresolved questions. We'll make sure previous discussion points linked and resolved in the updating PR.

@codope codope added priority:critical production down; pipelines stalled; Need help asap. and removed priority:blocker labels Jul 20, 2022
@YannByron YannByron force-pushed the master_cdc_code branch 2 times, most recently from fae8816 to 281d69e Compare July 23, 2022 05:46
@xushiyan xushiyan changed the title [RFC-51][HUDI-3478] Hudi CDC [HUDI-3478] Support CDC for Spark in Hudi Jul 24, 2022
@xushiyan xushiyan added pr:wip Work in Progress/PRs big-needle-movers and removed priority:critical production down; pipelines stalled; Need help asap. labels Jul 24, 2022
@xushiyan xushiyan removed the pr:wip Work in Progress/PRs label Jul 29, 2022
@YannByron
Copy link
Contributor Author

@hudi-bot run azure

@hudi-bot
Copy link

hudi-bot commented Aug 9, 2022

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@YannByron YannByron closed this Aug 17, 2022
@YannByron
Copy link
Contributor Author

Reopen: #6476

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants