-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pass original message down through conversion for storage write api #31106
Pass original message down through conversion for storage write api #31106
Conversation
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
Assigning reviewers. If you would like to opt out of this review, comment R: @robertwb for label java. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
@Abacn @ahmedabu98 could you take a look at this? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, the change lgtm. Have one thing to confirm (cc'd below)
...d-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkFailedRowsIT.java
Outdated
Show resolved
Hide resolved
@@ -52,16 +52,18 @@ | |||
/** This {@link PTransform} manages loads into BigQuery using the Storage API. */ | |||
public class StorageApiLoads<DestinationT, ElementT> | |||
extends PTransform<PCollection<KV<DestinationT, ElementT>>, WriteResult> { | |||
final TupleTag<KV<DestinationT, StorageApiWritePayload>> successfulConvertedRowsTag = | |||
new TupleTag<>("successfulRows"); | |||
final TupleTag<KV<DestinationT, KV<ElementT, StorageApiWritePayload>>> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here it changed PTransform output element type. Do we need some change in BigQueryTranslation to preserve upgrade compatibility? cc: @chamikaramj
or is there plan to setup precommit test for bigquery pipeline upgrade? so tests can auto detect this (like kafka upgrade project)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in theory, its within the overall BQ transform, so it might work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think changing output element type and the coder here could break streaming update compatibility in general.
Have you considered using the updateCompatibilityVersion
option ?
beam/sdks/java/core/src/main/java/org/apache/beam/sdk/options/StreamingOptions.java
Line 45 in c531f89
String getUpdateCompatibilityVersion(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would require us to maintain two implementations of Streaming Inserts, one with this change, and one without, right? I think that would be prohibitive in general for beam IOs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be called out in CHANGES.md if we have to do these breaking changes. But I recommend updateCompatibilityVersion
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aside from update compatibility issues, doesn't this increase the data shuffled as we are now shuffling the writepayload and the original elements? If so it seems that we might want the previous behavior not just for older SDKs but also in cases where an error function requiring the original element is not configured.
Do we need to change the output type for successful writes? It seems like the original element is just being added for error handling path.
If I understand correctly the graph is not changing here, just the encoding of the elements and we're going from StorageApiWritePayload to KV<E, StorageApiWritePayload>. Would it be possible to have a special coder for KV<E, StorageApiWritePayload> such that it handles decoding previously coded StorageApiWritePayload as KV<null, StorageApiWritePayload>?
It seems like that could be done via a Schema in some way since StorageApiWritePayload uses autovalue schema coder. I think the dataflow backend would note that the schema is compatible in that case and allow the update to proceed.
Or a simpler route, could the element just be added as a nullable field to StorageApiWritePayload instead of changing to KV<E, StorageApiWritePayload>?
To share code, could we just switch to the new type throughout and have the element be null if not needed or missing due to previously encoded? Since the new type is a superset of the old, it seems like the compatibility with previous sdks could be kept to the boundaries of the impl (if the above doesn't let you share completely).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea of a coder that handles both decodings is very interesting to me. We could use the compat flag to indicate using the old coder, instead of the new coder, which would preserve backwards compatibility much better. I'll try that, as I think thats a good way to do this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If they use the new feature, we use one coder. If they don't, we use the old coder
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this wouldn't even require the update compat flag i think actually
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a side comment, this is another motivation to use schema coders more ubiquitously--adding another field is update compatible.
On another note, anything that involves shuffling more data in the main data path should be looked at carefully from a perf standpoint. We've gone to a lot of effort (e.g. with dynamic destinations) to ensure shuffling metadata doesn't become a perf impediment.
Also going to run some load test to see if it has performance implications update:
identical to 2.55.0 (51205), 2.56.0 (47579) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM as well. Just one suggestion for performance.
P.S. I see @Abacn's load test results, feel free to ignore
...ud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiConvertMessages.java
Outdated
Show resolved
Hide resolved
…me, to enable updates
If I understand this correctly, we are now propagating both ElementT and StorageApiWritePayload - correct? Doesn't this double the amount of data being processed? |
Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment R: @damondouglas for label java. Available commands:
|
Looks like this was approved but has conflicts that need to be resolved. |
There was an unresolved discussion about maintaining update compatibility without duplicating a lot of code: #31106 (comment) |
I'm also wanting to know whether there was something motivating this change
- i.e. is their a Beam user that currently needs this? In addition to being
careful about perf, this PR adds quite a bit of complexity to code that is
already fairly complex.
…On Mon, Jun 10, 2024 at 9:51 AM Robert Bradshaw ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java
<#31106 (comment)>:
> @@ -52,16 +52,18 @@
/** This ***@***.*** PTransform} manages loads into BigQuery using the Storage API. */
public class StorageApiLoads<DestinationT, ElementT>
extends PTransform<PCollection<KV<DestinationT, ElementT>>, WriteResult> {
- final TupleTag<KV<DestinationT, StorageApiWritePayload>> successfulConvertedRowsTag =
- new TupleTag<>("successfulRows");
+ final TupleTag<KV<DestinationT, KV<ElementT, StorageApiWritePayload>>>
As a side comment, this is another motivation to use schema coders more
ubiquitously--adding another field is update compatible.
On another note, anything that involves shuffling more data in the main
data path should be looked at carefully from a perf standpoint. We've gone
to a lot of effort (e.g. with dynamic destinations) to ensure shuffling
metadata doesn't become a perf impediment.
—
Reply to this email directly, view it on GitHub
<#31106 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AFAYJVNQ6T5NBLVCJN5CWDDZGXKQ5AVCNFSM6AAAAABGZJZPSWVHI2DSMVQWIX3LMV43YUDVNRWFEZLROVSXG5CSMV3GSZLXHMZDCMBYGMYDAOBRGI>
.
You are receiving this because you were mentioned.Message ID:
<apache/beam/pull/31106/review/2108300812 ***@***.***>
|
Reminder, please take a look at this pr: @damondouglas @chamikaramj |
Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment R: @kennknowles for label java. Available commands:
|
Reminder, please take a look at this pr: @kennknowles @ahmedabu98 |
Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment R: @damondouglas for label java. Available commands:
|
Reminder, please take a look at this pr: @damondouglas @Abacn |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see a few unreplied questions/comments in this PR.
Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment R: @kennknowles for label java. Available commands:
|
Enable users to specify an alternate way to generate the table row for the error output for BQIO's storage write api.
The user passes in a function of ElementT -> TableRow, and we maintain an index of the original elements passed in to BQIO. If the function exists, we use it to generate the error row, instead of the default behavior of emitting the failure directly.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.