Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] How to configure spark and flink to write mor tables using bucket indexes? #11946

Open
xiaofan2022 opened this issue Sep 14, 2024 · 12 comments

Comments

@xiaofan2022
Copy link

I want to use flink and spark to write to the mor table, and use bucket CONSISTENT_HASHING for the index, but I find that spark is very fast to write the full amount and flink is very slow(flink write 100record/s) to write increments.
spark sql:

CREATE TABLE test.tableA ()
USING hudi
TBLPROPERTIES (
'connector' = 'hudi',
'index.type'='BUCKET',
'hoodie.index.type'='BUCKET',
'hoodie.index.bucket.engine'='CONSISTENT_HASHING',
'hoodie.datasource.write.recordkey.field' = '',
'path' = '',
'preCombineField' = 'create_time',
'precombine.field' = 'create_time',
'primaryKey' = '',
'table.type' = 'MERGE_ON_READ',
'write.rate.limit'='10000',--flink配置
'write.tasks'='2',--flink配置
'write.utc-timezone'='false',
 'type' = 'mor');

flink_slow How to optimize?

@danny0405
Copy link
Contributor

Did you have chance to check the thread dump of the operators?

@xiaofan2022
Copy link
Author

xiaofan2022 commented Sep 15, 2024

"consistent_bucket_write: test.fin_ipr_inmaininfo_test (1/2)#0" Id=89 TIMED_WAITING on java.util.LinkedList@37d9fd7
at java.lang.Object.wait(Native Method)
- waiting on java.util.LinkedList@37d9fd7
at org.apache.hadoop.hdfs.DataStreamer.waitForAckedSeqno(DataStreamer.java:924)
at org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:692)
at org.apache.hadoop.hdfs.DFSOutputStream.hsync(DFSOutputStream.java:587)
at org.apache.hadoop.fs.FSDataOutputStream.hsync(FSDataOutputStream.java:145)
at org.apache.hadoop.fs.FSDataOutputStream.hsync(FSDataOutputStream.java:145)
at org.apache.hudi.common.table.log.HoodieLogFormatWriter.flush(HoodieLogFormatWriter.java:261)
at org.apache.hudi.common.table.log.HoodieLogFormatWriter.appendBlocks(HoodieLogFormatWriter.java:194)
at org.apache.hudi.io.HoodieAppendHandle.appendDataAndDeleteBlocks(HoodieAppendHandle.java:479)
at org.apache.hudi.io.HoodieAppendHandle.doAppend(HoodieAppendHandle.java:450)
at org.apache.hudi.table.action.commit.delta.BaseFlinkDeltaCommitActionExecutor.handleUpdate(BaseFlinkDeltaCommitActionExecutor.java:54)
at org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor.handleUpsertPartition(BaseFlinkCommitActionExecutor.java:191)
at org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor.execute(BaseFlinkCommitActionExecutor.java:109)
at org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor.execute(BaseFlinkCommitActionExecutor.java:71)
at org.apache.hudi.table.action.commit.FlinkWriteHelper.write(FlinkWriteHelper.java:77)
at org.apache.hudi.table.action.commit.delta.FlinkUpsertDeltaCommitActionExecutor.execute(FlinkUpsertDeltaCommitActionExecutor.java:51)
at org.apache.hudi.table.HoodieFlinkMergeOnReadTable.upsert(HoodieFlinkMergeOnReadTable.java:73)
at org.apache.hudi.client.HoodieFlinkWriteClient.upsert(HoodieFlinkWriteClient.java:148)
at org.apache.hudi.sink.StreamWriteFunction.lambda$initWriteFunction$1(StreamWriteFunction.java:192)
at org.apache.hudi.sink.StreamWriteFunction$$Lambda$1275/530894042.apply(Unknown Source)
at org.apache.hudi.sink.bucket.ConsistentBucketStreamWriteFunction.lambda$writeBucket$0(ConsistentBucketStreamWriteFunction.java:80)
at org.apache.hudi.sink.bucket.ConsistentBucketStreamWriteFunction$$Lambda$1599/372489614.apply(Unknown Source)
at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:267)
at java.util.Collections$2.tryAdvance(Collections.java:4717)
at java.util.Collections$2.forEachRemaining(Collections.java:4725)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at org.apache.hudi.sink.bucket.ConsistentBucketStreamWriteFunction.writeBucket(ConsistentBucketStreamWriteFunction.java:81)
at org.apache.hudi.sink.StreamWriteFunction.lambda$flushRemaining$7(StreamWriteFunction.java:467)
at org.apache.hudi.sink.StreamWriteFunction$$Lambda$1593/379154034.accept(Unknown Source)
at java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608)
at org.apache.hudi.sink.StreamWriteFunction.flushRemaining(StreamWriteFunction.java:463)
at org.apache.hudi.sink.StreamWriteFunction.snapshotState(StreamWriteFunction.java:137)
at org.apache.hudi.sink.bucket.ConsistentBucketStreamWriteFunction.snapshotState(ConsistentBucketStreamWriteFunction.java:69)
at org.apache.hudi.sink.common.AbstractStreamWriteFunction.snapshotState(AbstractStreamWriteFunction.java:167)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:88)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:222)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:173)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:336)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:228)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:213)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:715)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:350)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$14(StreamTask.java:1299)
...

Is hdfs write performance problematic? If I use simple index for spark, flink uses bucket index very quickly 1k-2krecord/s

@danny0405
Copy link
Contributor

The performance should be very near for consistent hashing and simple hashing, but from the stacktrace, it looks like the appending to files takes time.

@xiaofan2022
Copy link
Author

So how can this be optimized? This speed is too slow

@danny0405
Copy link
Contributor

@beyond1920 can you help with the performance issue here?

@danny0405
Copy link
Contributor

@xiaofan2022 Did you schedule the clustering for expanding the consistent hashing ring already? Did you check the tablePath/.bucket_index/consistent_hashing_metadata for the number of consistent hashing nodes?

@xiaofan2022
Copy link
Author

xiaofan2022 commented Sep 19, 2024

hdfs dfs -cat hdfs://nameservice1/apps/spark/warehouse/test.db/file_test/.hoodie/.bucket_index/consistent_hashing_metadata/00000000000000.hashing_meta | grep "value" | wc -l

result=>>>256

@danny0405
Copy link
Contributor

So you have 256 initial buckets?

@xiaofan2022
Copy link
Author

Yes, I set up 'hoodie. Bucket. Index. Max. Num. Buckets' =' 32 ',
'the hoodie. Bucket. Index. Min. Num. Buckets' =' 4 ', but found that there are still 256 buckets

@danny0405
Copy link
Contributor

yeah, let's figure out the reason, too many buckets would not perform well for streaming write.

@ad1happy2go
Copy link
Collaborator

@xiaofan2022 Any updates on this ticket? Were you able to find out the reason why we see 256 buckets?

@ad1happy2go ad1happy2go moved this from ⏳ Awaiting Triage to 👤 User Action in Hudi Issue Support Oct 17, 2024
@xiaofan2022
Copy link
Author

I first create tables through spark and import full data. Then flink updates incremental data in real time, but the default bucket in spark is 4

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: 👤 User Action
Development

No branches or pull requests

3 participants