-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Batch loading sometimes missing a records #188
base: master
Are you sure you want to change the base?
Conversation
If sink task write gcs file in parallel, gcs filename is dupulicated currently. gcs filename added first offset for unique file name If sink task write gcs file in same folder, other process delete other tables gcs file. so seperated folder for tables
If sink task write gcs file in parallel, gcs filename is dupulicated currently. gcs filename added first offset for unique file name If sink task write gcs file in same folder, other process delete other tables gcs file. so seperated folder for tables waiting job status for bigquery load job adding log for job's suceeded rows count
Hey @jeonguihyeong could you reformulate your PR text , it's very complicate to understand your message , thanks |
@@ -247,7 +249,8 @@ public void put(Collection<SinkRecord> records) { | |||
TableWriterBuilder tableWriterBuilder; | |||
if (config.getList(BigQuerySinkConfig.ENABLE_BATCH_CONFIG).contains(record.topic())) { | |||
String topic = record.topic(); | |||
String gcsBlobName = topic + "_" + uuid + "_" + Instant.now().toEpochMilli(); | |||
long offset = record.kafkaOffset(); | |||
String gcsBlobName = topic + "_" + uuid + "_" + Instant.now().toEpochMilli()+"_"+records.size()+"_"+offset; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would having a test case for validating that parallel puts create different files with the right offset help?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
gcp side
- you can use gcs versioning and how many times write gcs name.
bigquery sink
- it's concept is used s3 sink. i convert to bigquery sink.
it is using first offset of records. and it is not corrupted because kafka message offset is unique.
if you need more comment, connect me.
We have not been able to reproduce missing records issue on our end. |
If sink task write gcs file in parallel, gcs filename is dupulicated currently.
gcs filename added first offset for unique file name
If sink task write gcs file in same folder, other process delete other tables gcs file.
so seperated folder for tables
waiting job status for bigquery load job
adding log for job's suceeded rows count
Update GCSToBQLoadRunnable.java
If sink task write gcs file in parallel, gcs filename is dupulicated currently.
gcs filename added first offset for unique file name
If sink task write gcs file in same folder, other process delete other tables gcs file.
so seperated folder for tables
waiting job status for bigquery load job
adding log for job's suceeded rows count
Update BigQuerySinkTask.java
If sink task write gcs file in parallel, gcs filename is dupulicated currently.
gcs filename added first offset for unique file name
If sink task write gcs file in same folder, other process delete other tables gcs file.
so seperated folder for tables