-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix pane info in BigQuery load job id #28272
Conversation
Run PostCommit_Java_Dataflow |
Run PostCommit_Java_DataflowV2 |
There are still multiple code paths relies on the pane info, e.g. write_disposition and create_disposition settings: https://github.com/apache/beam/blob/3ff66d38c41fde475f71254d889ba46440904238/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java#L194C67-L194C67 There might be other issues after fixing this particular issue. Let me think more about it. |
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
5c40094
to
0ed4c78
Compare
there were test failures due to periodic impulse timing. Re-designed test. If passed then ready to review |
c.sideInput(jobIdToken), finalTableDestination, -1, c.pane().getIndex()); | ||
c.sideInput(jobIdToken), finalTableDestination, -1); | ||
|
||
if (isFirstPane) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is another code path relying on pane info. Checked that upstream transform has a GBK and no ReShuffle in between: https://github.com/apache/beam/blob/0ed4c78a799cf5a6cc6a0b40b23ca498096769c5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java#L404C5-L404C5
so this one should be fine. Nevertheless it's good to add logging to monitor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unrelated to this PR but we're actually seeing a bug due to relying on the pane index in this code path: #28309
Run PostCommit_Java_DataflowV2 |
Run PostCommit_Java_Dataflow |
Although Dataflow PostCommit have multiple quota exceeded failure, bigquery tests all passed: https://ci-beam.apache.org/job/beam_PostCommit_Java_DataflowV2_PR/218/testReport/org.apache.beam.sdk.io.gcp.bigquery/ |
R: @ahmedabu98 will fix spotless in next iteration |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control |
* {@link #startAt} and {@link #stopAt}, as the first timestamp is determined at run time | ||
* (pipeline starts processing). | ||
*/ | ||
public PeriodicImpulse stopAfter(Duration duration) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mark with @Internal
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P.S. also add to catchUpToNow()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added
// add randomness to jobId to avoid conflict | ||
String jobId = | ||
String.format("%s_%s_%s", prefix, destinationHash, randomUUIDString().substring(0, 16)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm worried this may cause duplication of data due to bundle retry.
Let's say a bundle fails during load/copy execution and the BQ job was successful. Beam would process the bundle again and these lines will create a fresh job ID. Under previous circumstances, this job ID is constructed in a deterministic way and would be recognized by BQ as a recently successful job so will be ignored. But now since the ID is always new, BQ will execute the job and we will end up with duplicate data.
return GenerateSequence.from(0) | ||
.to(rowCount) | ||
.withRate(1, Duration.millis(timestampIntervalInMilliseconds)); | ||
static class UnboundedStream extends PTransform<PBegin, PCollection<Long>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, this fixes the test cases that were supposed to be streaming but weren't.
c.sideInput(jobIdToken), finalTableDestination, -1, c.pane().getIndex()); | ||
c.sideInput(jobIdToken), finalTableDestination, -1); | ||
|
||
if (isFirstPane) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unrelated to this PR but we're actually seeing a bug due to relying on the pane index in this code path: #28309
Run PostCommit_Java_DataflowV2 |
dataflow v2 postcommit passed; java precommit known flink test flakiness. PTAL @ahmedabu98 |
LOG.info( | ||
"Setup write disposition {}, create disposition {} for first pane BigQuery job {}", | ||
writeDisposition, | ||
createDisposition, | ||
jobIdPrefix); | ||
} else { | ||
LOG.debug("Setup write disposition {}, create disposition {} for BigQuery job {}", | ||
writeDisposition, createDisposition, jobIdPrefix); | ||
LOG.debug( | ||
"Setup write disposition {}, create disposition {} for BigQuery job {}", | ||
writeDisposition, | ||
createDisposition, | ||
jobIdPrefix); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we include the bigquery table destination here too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the destination name (without hash) may have PII. In the past I put the logs that may contain PII to debug level so they do not get stored by default (unless override to debug).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(note: final table destination is hased to jobId)
abstract Boolean isFirstPane(); | ||
|
||
abstract Long getPaneIndex(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we maintaining pane index coming out of write tables? I don't see that we're using it anywhere downstream
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will revert the WriteTable.Result changes. Originally it was just to keep WritePartition.Result and WriteTable.Result similar.
return partitionResult.isFirstPane(); | ||
} | ||
|
||
public long paneIndex() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am going to preserve the change to PendingJobData, as the pane index info would be helpful to resolve possible racing condition of CREATE_TRUNCATE pending jobs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean for debugging purposes? Will note that paneIndex()
is still not being called anywhere
Run PostCommit_Java_Dataflow |
Run PostCommit_Java_DataflowV2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM if tests pass
Are you sure this is right? Isn't the purpose of the job id to prevent duplicates? |
I'm not sure what you mean by random upstream keys. Reshuffle
currently maps KV -> KV, so no keys are changed. The only difference is
that the keys are now spread out on different workers, which is a semantic
no op.
You may be thinking of Rehsuffle.viaRandomKey, which is a bit different.
…On Tue, Sep 12, 2023 at 6:33 AM Kenn Knowles ***@***.***> wrote:
Actually each GBK has its own sequence of trigger firings, and when it
comes to reshuffle there are two possibilities:
1. The reshuffle does a GBK then explodes it (per its expansion) in
which case the sequence of pane indices come from the Reshuffle/always
trigger on that GBK.
2. The reshuffle is a semantics "no op" at the PCollection level, in
which case the pane indices would be preserved from an upstream
aggregation, but reshuffle results in bundles from random upstream keys so
they are deceptive.
—
Reply to this email directly, view it on GitHub
<#28272 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AFAYJVLDCEOGYBQCRHRTBSDX2BQDFANCNFSM6AAAAAA4GWNMFI>
.
You are receiving this because you were mentioned.Message ID:
***@***.***>
|
Right. I was, indeed, thinking of reshuffle via random key. But what I'm saying does apply to Reshuffle by assigned keys. I have been a person who always says that Reshuffle is a semantic noop. If we should preserve that we need the v2 and the SDK implementation to record the pane info (aka reify If you have |
Notably we only have tests like beam/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java Line 115 in cf0cf3b
Considering that reifying and testing against a full windowed value would have be somewhat of a default thing to do, I would guess the original intent was very aware of this issue. |
* Introduce PeriodicImpulse.stopAfter() * Use it in streaming FILE_LOAD integration test * Fix unit test assert to consider randomness in jobId
ed2bf05
to
d1dd0d3
Compare
rebased onto master to test #28312 |
Run PostCommit_Java_DataflowV2 |
Test Results 1 269 files + 1 217 1 269 suites +1 217 2h 44m 14s ⏱️ - 8h 29m 18s Results for commit 43f014c. ± Comparison against base commit 220cae7. This pull request removes 158 and adds 10612 tests. Note that renamed tests count towards both.
This pull request removes 29 skipped tests and adds 68 skipped tests. Note that renamed tests count towards both.
♻️ This comment has been updated with latest results. |
Tests now exercised and passed on Dataflow Runner V2: https://ci-beam.apache.org/job/beam_PostCommit_Java_DataflowV2_PR/226/testReport/org.apache.beam.sdk.io.gcp.bigquery/FileLoadsStreamingIT/ |
@reuvenlax as @Abacn mentioned, we broke update-compatibility recently in #28312 so that we can unblock dynamic destinations with copy jobs (#28309). Do we have the green light to merge these changes? |
Run Java PreCommit |
FYI - https://issues.apache.org/jira/browse/BEAM-7195 was caused by the
inverse issue. The old code assumed that Reshuffle would produce pane
indices, but of course it just forwards them.
Let's wait until we figure out the correct behavior. Very few users will be
exercising this from runner v2 today, so I would rather be cautious here -
especially if the right answer is to fix runner v2.
…On Fri, Sep 15, 2023 at 3:33 AM Ahmed Abualsaud ***@***.***> wrote:
@reuvenlax <https://github.com/reuvenlax> as @Abacn
<https://github.com/Abacn> mentioned, we broke update-compatibility
recently in #28312 <#28312> so that we
can unblock dynamic destinations with copy jobs (#28309
<#28309>). Do we have the green
light to merge these changes?
If Dataflow folks decide Runner V2 should preserve pane indices, that
support may still take some time to implement. In the meantime, this
ensures file loads streaming works on the runner (IIUC it previously never
did)
—
Reply to this email directly, view it on GitHub
<#28272 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AFAYJVINXQC4WCJC6KPYJB3X2QVHVANCNFSM6AAAAAA4GWNMFI>
.
You are receiving this because you were mentioned.Message ID:
***@***.***>
|
Yea let's sync on this. I think I've come around but I want to get it on dev@ in at least a 1 pager for what the spec is. If it turns out to not be dangerous to reinstate, and we can regain the semantic property "reshuffle is a semantic noop" then that is pretty desirable, so that runners can freely add and remove reshuffles. Noting that here is where Dataflow v1 implemented reshuffle and re-instated the pre-reshuffle pane: Line 61 in 3402490
Noting that Beam's reference implementation, the closest thing we have to a spec, does not preserve the pane but will produce a sequence of increasing panes. |
@kennknowles thanks for pin to the related code path for Dataflow legacy worker. I now understand that Dataflow legacy work does some special PTransform override for ReShuffle. For portable pipelines, is that true that there is no longer similar PTransform override, and a possible fix would change java core's ReShuffle to also recover pane (currently it recovers Timestamp) ?
|
I expect there is a similar override inside the Dataflow service and Unified Worker. The implementation in the SDK would be slow and just for reference, if it was executed. A reshuffle really should not have to cross the Fn API. |
But yes if we make the SDK recover the pane info also, then that would make the SDK (direct runner, prism, etc) match Dataflow v1. This would probably not fix v2, but it would fix "the spec". |
OK so when I thought about it I agree with everyone else that the reference implementation is wrong and that the right behavior is to be the same as the identity function on PCollections. That means keeping the pane info exactly. I wrote the dev list about it. I think it is probably such a boring thread that it will not get a lot of comment. |
c.sideInput(loadJobIdPrefixView), | ||
tableDestination, | ||
partition, | ||
element.getValue().getPaneIndex()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My concern is that if we have this:
GBK -> processing -> reshuffle -> this DoFn
Can you tell me which case we are in?
- The
processing
left the key the same so the reshuffle is just for checkpoint - The
processing
changed the key so that the current element key + pane index is no longer unique
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well it's a little bit more subtle. It is the same key, but became ShardedKey after processing
After GBK, key is DestinationT
After processing, key is ShardedKey (shard is added in WritePartitions)
This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions. |
This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
Fixes #28219
Please add a meaningful description for your change here
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.