-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixing number of insert buckets to be generated by rounding off to the closest greater integer #500
Conversation
326a1e2
to
bb9c3ed
Compare
Okay fair enough.. This was a conscious decision I made at that point to round down.. Conversely, did you notice any correlation with doing this and observing the null/empty mergehandles? Coz rounding up can cause that technically.. |
bb9c3ed
to
e6c2c58
Compare
Nope, the rounding up logic isn't running for the topics that are failing, it still uses the current code. |
@@ -726,7 +726,7 @@ private void assignInserts(WorkloadProfile profile) { | |||
insertRecordsPerBucket = config.getParquetMaxFileSize() / averageRecordSize; | |||
} | |||
|
|||
int insertBuckets = (int) Math.max(totalUnassignedInserts / insertRecordsPerBucket, 1L); | |||
int insertBuckets = (int) Math.max(Math.ceil((1.0) * totalUnassignedInserts / insertRecordsPerBucket), 1L); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did you intend to do Math.ceil((1.0 * totalUnassignedInserts) / insertRecordsPerBucket)
instead? Also with ceil, do we still need Math.max?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, we don't need Math.max, ceil will always give minimum 1 and we already have an if condition for > 0..
private List<HoodieCopyOnWriteTable.InsertBucket> testUpsertPartitioner(int smallFileSize, int numInserts, | ||
int numUpdates, int fileSize, boolean autoSplitInserts) throws Exception { | ||
final String testPartitionPath = "2016/09/26"; | ||
private UpsertPartitioner testUpsertPartitioner(int smallFileSize, int numInserts, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to getUpsertPartitioner
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
…e closest greater integer
e6c2c58
to
4934fd9
Compare
@vinothchandar addressed your comments |
In scenarios where the bucket count is 1.9 for example, with the current code we end up rounding off to the closest lower integer. So in this case, the insertBucketCount would be 1. This would result in 2 buckets worth of data being routed to a single bucket (hence a single spark partition) and serializing writes to parquet. If the bucket count is rounded off to the largest integer, in this case 2, we would route records to 2 different spark partitions resulting in parallel writing of the parquet files and lower spark task times.
Noticed this in our ingestion jobs where sometimes the job takes a long time due to this issue (more evident when writing out 1 GB parquet files)
@vinothchandar @bvaradar