-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add argument checks and tests for BQ StorageAPI sinks. #27213
Conversation
Assigning reviewers. If you would like to opt out of this review, comment R: @lostluck added as fallback since no labels match configuration Available commands:
The PR bot will only process comments in the main thread (not review comments). |
Internally, we will decide whether to call withSchema() with a schema of shuffled fields based on this option.
* Fix a few typos on the method name STORAGE_WRITE_API * Change the warning message when both numStorageWriteApiStreams and autoSharding are set. In this case, autoSharding takes priority. * Add an argument check for using both numFileShards and autoSharding via FILE_LOADS.
Run Java_GCP_IO_Direct PreCommit |
Reminder, please take a look at this pr: @lostluck |
Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment R: @lostluck added as fallback since no labels match configuration Available commands:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks shunping! i left some comments
one general nit: Some of the ...but the collection was {} and the method was {}
warnings are a bit verbose and give too much information, which may be confusing when debugging. I think some could be more concise and include only the relevant mismatch.
.execute(); | ||
LOG.info("Successfully updated the schema of table: " + tableId); | ||
} catch (Exception e) { | ||
LOG.debug("Exceptions caught when updating table schema: " + e.getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Severity should maybe be raised to info/warning for visibility?
Collections.shuffle(fieldNamesShuffled, new Random(RANDOM_SEED)); | ||
|
||
// The updated schema includes all fields in the original schema plus a random new field | ||
List<String> fieldNamesWithExtra = new ArrayList<String>(fieldNamesOrigin); | ||
Random r = new Random(RANDOM_SEED); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RANDOM_SEED is constant, wouldn't this result in the same "randomly" generated numbers for each run?
} else if (method != Method.STREAMING_INSERTS) { | ||
LOG.warn( | ||
"The setting of auto-sharding is ignored. It is only supported when writing an" | ||
+ " unbounded PCollection via FILE_LOADS, STREAMING_INSERTS or" | ||
+ " STORAGE_WRITE_API, but the collection was {} and the method was {}.", | ||
input.isBounded(), | ||
method); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think at this point it makes more sense to check method == STORAGE_API_AT_LEAST_ONCE
? and have a warning message specific for that method. also needs a check that autosharding is enabled here
"The setting of storageApiTriggeringFrequency in BigQueryOptions is ignored." | ||
+ " It is only supported when writing an unbounded PCollection via" | ||
+ " STORAGE_WRITE_API, but the collection was {} and the method was {}.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Could be more concise; using STORAGE_WRITE_API
isn't the problem here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left another comment. Also on another look, I think we need a few more tests to:
- verify that unknown values are ignored
- verify that beam responds well when schema field types are also updated (e.g. field going from "required" to "nullable")
try { | ||
rowCount = this.getRowCount(this.projectId + "." + this.datasetId + "." + this.tableId); | ||
} catch (Exception e) { | ||
LOG.error(e.toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P.S. maybe we should rethrow the error here instead of just logging it? if getRowCount
starts failing for whatever reason, rowCount
will stay at 0 and the schema will never be updated, making the tests obsolete (since the purpose is to test the schema update feature).
It looks like this is already getting a review, so I'm going to bow out (I've been on vacation for most of June, hence no actions.) But if you need a 2nd look or a committer for merging, please feel free to ping me again. |
R: @ahmedabu98 |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control |
This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions. |
This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
Some argument checks are added to ensure the options specified by users are valid.
In addition, integration tests on a small-scale unbounded data source have been added. In these tests, we do fuzz testing on the schema order by shuffling fields in the BigQuery write schema. We also trigger an update to the table schema while data streaming is ongoing, to simulate a potential data-loss problem due to BigQuery schema change.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.